Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP feat: Async-Iterator interface #1850

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions greenkeeper.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"default": {
"packages": [
"package.json",
"packages/async-iterator/package.json",
"packages/binding-abstract/package.json",
"packages/binding-mock/package.json",
"packages/bindings/package.json",
Expand Down
4 changes: 4 additions & 0 deletions packages/async-iterator/.npmignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
.DS_Store
*.test.js
*.node-10-test.js
CHANGELOG.md
13 changes: 13 additions & 0 deletions packages/async-iterator/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# @serialport/AsyncIterator

This is a node SerialPort project! This package does some neat stuff.

- [Guides and API Docs](https://serialport.io/)

This is why you'd use it.

This is how you use it.
```js
const asyncIterator = new AsyncIterator()

```
88 changes: 88 additions & 0 deletions packages/async-iterator/lib/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
const debug = require('debug')('serialport/async-iterator')

/**
* An AsyncIterator that does something pretty cool.
* @param {Object} options open options
* @example ```
// To use the `AsyncIterator` interface:
const { open } = require('@serialport/async-iterator')
const Binding = require('@serialport/bindings')
const ports = await Binding.list()
const arduinoPort = ports.find(info => (info.manufacture || '').includes('Arduino'))
const port = await open(arduinoPort)

// read bytes until close
for await (const bytes of port) {
console.log(`read ${bytes.length} bytes`)
}

// read 12 bytes
const { value, end } = await port.next(12)
console.log(`read ${value.length} bytes / port closed: ${end}`)

// write a buffer
await port.write(Buffer.from('hello!'))
```
*/

/**
* Wrap an async function so that subsequent calls are queued behind the previous promise resolving
*/
const promiseQueue = func => {
const queuedFunc = (...args) => {
queuedFunc.previousCall = queuedFunc.previousCall.then(() => func(...args))
return queuedFunc.previousCall
}
queuedFunc.previousCall = Promise.resolve()
return queuedFunc
}

const open = async ({ Binding, readSize = 1024, path, bindingOptions = {}, ...openOptions }) => {
const binding = new Binding(bindingOptions)
debug('opening with', { path, openOptions })
await binding.open(path, openOptions)

const next = async (bytesToRead = readSize) => {
if (!binding.isOpen) {
debug('next: port is closed')
return { value: undefined, done: true }
}

const readBuffer = Buffer.allocUnsafe(bytesToRead)
try {
debug(`next: read starting`)
const { bytesRead } = await binding.read(readBuffer, 0, bytesToRead)
debug(`next: read ${bytesRead} bytes`)
const value = readBuffer.slice(0, bytesRead)
return { value, done: false }
} catch (error) {
if (error.canceled) {
debug(`next: read canceled`)
return { value: undefined, done: true }
}
debug(`next: read error ${error.message}`)
throw error
}
}

const port = {
[Symbol.asyncIterator]: () => port,
next: promiseQueue(next),
write: promiseQueue(data => binding.write(data)),
close: () => binding.close(),
update: opt => binding.update(opt),
set: opt => binding.set(opt),
get: () => binding.get(),
flush: () => binding.flush(),
drain: () => binding.drain(),
binding,
get isOpen() {
return binding.isOpen
},
}
return port
}

module.exports = {
open,
}
86 changes: 86 additions & 0 deletions packages/async-iterator/lib/index.node-10-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
const assert = require('assert')
const { open } = require('./index')
const MockBinding = require('@serialport/binding-mock')

const testPath = '/dev/coolPort'
describe('AsyncIterator', () => {
if (process.versions.node.split('.')[0] < 10) {
// eslint-disable-next-line mocha/no-pending-tests
it('Requires Node 10 or higher')
return
}
describe('.open', () => {
beforeEach(() => {
MockBinding.reset()
MockBinding.createPort(testPath)
})
it('Opens port', async () => {
const port = await open({ Binding: MockBinding, path: testPath })
assert.strictEqual(port.isOpen, true)
await port.close()
assert.strictEqual(port.isOpen, false)
})
})
describe('reading', () => {
beforeEach(() => {
MockBinding.reset()
MockBinding.createPort(testPath)
})
it('reads data', async () => {
const port = await open({ Binding: MockBinding, path: testPath, readSize: 1 })
const buffers = []
const testData = Buffer.from('This is test data.')
port.binding.emitData(testData)
for await (const bytes of port) {
buffers.push(bytes)
// if we're done reading available data close the port
if (port.binding.port.data.length === 0) {
await port.close()
}
}
assert.deepStrictEqual(Buffer.concat(buffers), testData, 'data does not match')
assert.strictEqual(buffers.length, testData.length)
})
it('deals with concurrent reads', async () => {
const port = await open({ Binding: MockBinding, path: testPath, readSize: 1 })
const testData = Buffer.from('This is test data.')
port.binding.emitData(testData)
const buffers = await Promise.all([port.next(testData.length / 2), port.next(testData.length / 2)])
assert.deepStrictEqual(Buffer.concat(buffers.map(itr => itr.value)), testData, 'data does not match')
await port.close()
})
it('deals with huge read requests', async () => {
const port = await open({ Binding: MockBinding, path: testPath, readSize: 1 })
const testData = Buffer.from('This is test data.')
port.binding.emitData(testData)
const data = await port.next(10000)
assert.deepStrictEqual(data.value, testData)
await port.close()
})
it('deals with the port being closed', async () => {
const port = await open({ Binding: MockBinding, path: testPath, readSize: 1 })
const read = port.next()
await port.close()
assert.deepStrictEqual(await read, { value: undefined, done: true })
})
})
describe('writes', () => {
beforeEach(() => {
MockBinding.reset()
MockBinding.createPort(testPath)
})
it('writes data', async () => {
const port = await open({ Binding: MockBinding, path: testPath, readSize: 1 })
const testData = Buffer.from('This is test data.')
await port.write(testData)
assert.deepStrictEqual(port.binding.lastWrite, testData)
})
it('queues writes', async () => {
const port = await open({ Binding: MockBinding, path: testPath, readSize: 1 })
const testData = Buffer.from('This is test data.')
const testData2 = Buffer.from('this is also test data')
await Promise.all([port.write(testData), port.write(testData2)])
assert.deepStrictEqual(port.binding.lastWrite, testData2)
})
})
})
8 changes: 8 additions & 0 deletions packages/async-iterator/lib/index.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
if (process.versions.node.split('.')[0] < 10) {
describe('AsyncIterator', () => {
// eslint-disable-next-line mocha/no-pending-tests
it('Requires Node 10 or higher')
})
} else {
require('./index.node-10-test')
}
22 changes: 22 additions & 0 deletions packages/async-iterator/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"name": "@serialport/async-iterator",
"version": "8.0.5",
"main": "lib/async-iterator.js",
"dependencies": {
"debug": "^4.1.1"
},
"devDependencies": {
"@serialport/binding-mock": "^8.0.4"
},
"engines": {
"node": ">=10.3.0"
},
"publishConfig": {
"access": "public"
},
"license": "MIT",
"repository": {
"type": "git",
"url": "git://github.com/node-serialport/node-serialport.git"
}
}