Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 33 additions & 19 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,43 @@ const createMutex = (name, options) => {

return {
readLock: (fn) => {
if (!readQueue) {
readQueue = new Queue({
concurrency: options.concurrency,
autoStart: false
})

const localReadQueue = readQueue

masterQueue.add(() => {
localReadQueue.start()

return localReadQueue.onIdle()
.then(() => {
if (readQueue === localReadQueue) {
readQueue = null
}
})
})
// If there's already a read queue, just add the task to it
if (readQueue) {
return readQueue.add(() => timeout(fn(), options.timeout))
}

return readQueue.add(() => timeout(fn(), options.timeout))
// Create a new read queue
readQueue = new Queue({
concurrency: options.concurrency,
autoStart: false
})
const localReadQueue = readQueue

// Add the task to the read queue
const readPromise = readQueue.add(() => timeout(fn(), options.timeout))

masterQueue.add(() => {
// Start the task only once the master queue has completed processing
// any previous tasks
localReadQueue.start()

// Once all the tasks in the read queue have completed, remove it so
// that the next read lock will occur after any write locks that were
// started in the interim
return localReadQueue.onIdle()
.then(() => {
if (readQueue === localReadQueue) {
readQueue = null
}
})
})

return readPromise
},
writeLock: (fn) => {
// Remove the read queue reference, so that any later read locks will be
// added to a new queue that starts after this write lock has been
// released
readQueue = null

return masterQueue.add(() => timeout(fn(), options.timeout))
Expand Down
19 changes: 19 additions & 0 deletions test/fixtures/process-error-handling.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
const mortice = require('../../')

const mutex = mortice()

mutex.readLock(() => {
return new Promise((resolve, reject) => {
console.info('read 1')

reject(new Error('err'))
})
})

mutex.writeLock(() => {
return new Promise((resolve, reject) => {
console.info('write 1')

resolve()
})
})
22 changes: 22 additions & 0 deletions test/fixtures/process-read-then-write.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
const mortice = require('../../')

const mutex = mortice()

mutex.readLock(() => {
return new Promise((resolve, reject) => {
console.info('read 1')

setTimeout(() => {
console.info('read 1 complete')
resolve()
}, 500)
})
})

mutex.writeLock(() => {
return new Promise((resolve, reject) => {
console.info('write 1')

resolve()
})
})
17 changes: 17 additions & 0 deletions test/process.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,20 @@ write 2
read 4`)
})
})

test('executes read then waits to start write', (t) => {
return exec('node', [path.join(__dirname, 'fixtures', 'process-read-then-write.js')])
.then(result => {
t.is(result.stdout, `read 1
read 1 complete
write 1`)
})
})

test('continues processing after error', (t) => {
return exec('node', [path.join(__dirname, 'fixtures', 'process-error-handling.js')])
.then(result => {
t.is(result.stdout, `read 1
write 1`)
})
})