Skip to content

Commit

Permalink
Add writable BatchStream (#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
vweevers authored Mar 25, 2022
1 parent 7194c99 commit 2b892a5
Show file tree
Hide file tree
Showing 5 changed files with 315 additions and 11 deletions.
40 changes: 37 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# level-web-stream

**Read from an [`abstract-level`](https://github.com/Level/abstract-level) database using [Web Streams](https://developer.mozilla.org/en-US/docs/Web/API/Streams_API).** Compatible with browsers and Node.js.
**Read and write to an [`abstract-level`](https://github.com/Level/abstract-level) database using [Web Streams](https://developer.mozilla.org/en-US/docs/Web/API/Streams_API).** Compatible with browsers and Node.js.

> :pushpin: To instead consume data using Node.js streams, see [`level-read-stream`](https://github.com/Level/read-stream).
> :pushpin: To instead consume data using Node.js streams, see [`level-read-stream`](https://github.com/Level/read-stream). On Node.js 16, `level-read-stream` is ~3 times faster than `level-web-stream` and the performance of Web Streams isn't likely to improve anytime soon. On the other hand, Web Streams are a step towards a standard library for JavaScript (across Node.js, Deno and browsers).
[![level badge][level-badge]](https://github.com/Level/awesome)
[![npm](https://img.shields.io/npm/v/level-web-stream.svg)](https://www.npmjs.com/package/level-web-stream)
Expand All @@ -15,6 +15,8 @@

## Usage

### Reading

```js
const { EntryStream } = require('level-web-stream')
const { MemoryLevel } = require('memory-level')
Expand Down Expand Up @@ -62,6 +64,18 @@ await new KeyStream(db).pipeTo(new WritableStream({

Note that [`WritableStream`](https://developer.mozilla.org/en-US/docs/Web/API/WritableStream) is a global in browsers. In Node.js it can be imported from [`stream/web`](https://nodejs.org/api/webstreams.html).

### Writing

```js
const { EntryStream, BatchStream } = require('level-web-stream')

// Copy entries from db to db
const src = new EntryStream(db1)
const dst = new BatchStream(db2, { type: 'put' })

await src.pipeTo(dst)
```

## Install

With [npm](https://npmjs.org) do:
Expand All @@ -74,7 +88,7 @@ npm install level-web-stream

### `stream = new EntryStream(db[, options])`

Create a [`ReadableStream`](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream) that will yield entries. An entry is an array with `key` and `value` properties. The `db` argument must be an `abstract-level` database. The optional `options` object may contain:
Create a [`ReadableStream`](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream) that will yield entries. An entry is an array with two elements: a `key` and `value`. The `db` argument must be an `abstract-level` database. The optional `options` object may contain:

- `highWaterMark` (number): the maximum number of entries to buffer internally before ceasing to read further entries. Default 1000.

Expand All @@ -88,6 +102,26 @@ Same as `EntryStream` but yields keys instead of entries, using `db.keys()` inst

Same as `EntryStream` but yields values instead of entries, using `db.values()` instead of `db.iterator()`. If only values are needed, using `ValueStream` may increase performance because keys won't have to be fetched.

### `stream = new BatchStream(db[, options])`

Create a [`WritableStream`](https://developer.mozilla.org/en-US/docs/Web/API/WritableStream) that takes _operations_ or _entries_, to be written to the database in batches of fixed size using `db.batch()`. If a batch fails the stream will be aborted but previous batches that did succeed will not be reverted.

An _operation_ is an object containing:

- A `key` property (required)
- A `type` property (optional, one of `'put'`, `'del'`)
- A `value` property (required if type is `'put'`, ignored if type is `'del'`)
- Other operation properties accepted by `db.batch()`.

An _entry_ is an array with two elements: a `key` and `value`. This allows piping a readable `EntryStream` into a writable `BatchStream`. Writing `[key, value]` is the same as writing `{ key, value }`.

The `db` argument must be an `abstract-level` database. The optional `options` object may contain:

- `highWaterMark` (number, default 500): the maximum number of operations to buffer internally before committing them to the database with `db.batch()`. No data will be committed until `highWaterMark` is reached or until the stream is closing. Increasing `highWaterMark` can improve throughput at the cost of memory usage and at risk of blocking the JavaScript event loop while `db.batch()` is copying data.
- `type` (string, default `'put'`): default operation `type` if not set on individual operations or entries (which can't set it). Must be `'put'` or `'del'`.

Any other options are forwarded to the `options` argument of `db.batch(operations, options)` thus following the same rules of precedence (of options versus operation properties).

## Contributing

[`Level/web-stream`](https://github.com/Level/web-stream) is an **OPEN Open Source Project**. This means that:
Expand Down
43 changes: 42 additions & 1 deletion index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ import {
AbstractLevel,
AbstractIteratorOptions,
AbstractKeyIteratorOptions,
AbstractValueIteratorOptions
AbstractValueIteratorOptions,
AbstractBatchOptions,
AbstractBatchOperation
} from 'abstract-level'

/**
Expand Down Expand Up @@ -74,3 +76,42 @@ declare interface LevelReadableStreamOptions {
*/
fillCache?: boolean | undefined
}

/**
* A {@link WritableStream} that takes _operations_ or _entries_.
*/
export class BatchStream<K, V, TDatabase = AbstractLevel<any, any, any>>
extends WritableStream<AbstractBatchOperation<TDatabase, K, V> | [K, V]> {
/**
* Create a {@link WritableStream} that takes _operations_ or _entries_, to be
* written to {@link db} in batches of fixed size using `db.batch()`.
*
* @param db Database to write to.
* @param options Options for the stream and `db.batch()`.
*/
constructor (
db: TDatabase,
options?: (BatchStreamOptions & AbstractBatchOptions<K, V>) | undefined
)
}

/**
* Stream options for {@link BatchStream}.
*/
declare interface BatchStreamOptions {
/**
* The maximum number of operations to buffer internally before
* committing them to the database with `db.batch()`.
*
* @defaultValue `500`
*/
highWaterMark?: number | undefined

/**
* Default operation `type` if not set on individual operations or entries (which can't
* set it).
*
* @defaultValue `'put'`
*/
type?: 'put' | 'del'
}
65 changes: 62 additions & 3 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
'use strict'

const { ReadableStream, CountQueuingStrategy } = require('./streams')
const { ReadableStream, CountQueuingStrategy, WritableStream } = require('./streams')

const kCanceled = Symbol('canceled')
const kIterator = Symbol('iterator')
const kOperations = Symbol('operations')
const kBatchSize = Symbol('batchSize')
const kBatchType = Symbol('batchType')
const kBatchOptions = Symbol('batchOptions')
const kDb = Symbol('db')

class LevelReadableSource {
class LevelSource {
constructor (iterator) {
this[kIterator] = iterator
this[kCanceled] = false
Expand Down Expand Up @@ -51,7 +55,7 @@ class LevelReadableStream extends ReadableStream {
constructor (db, method, options) {
const { highWaterMark, ...rest } = options || {}
const iterator = db[method](rest)
const source = new LevelReadableSource(iterator)
const source = new LevelSource(iterator)
const queueingStrategy = new CountQueuingStrategy({
highWaterMark: highWaterMark || 1000
})
Expand Down Expand Up @@ -81,6 +85,61 @@ class ValueStream extends LevelReadableStream {
}
}

class LevelSink {
constructor (db, batchSize, batchType, batchOptions) {
this[kDb] = db
this[kOperations] = []
this[kBatchSize] = batchSize
this[kBatchType] = batchType
this[kBatchOptions] = batchOptions
}

write (operation) {
if (Array.isArray(operation)) {
operation = {
key: operation[0],
value: operation[1],
type: this[kBatchType]
}
} else if (operation.type == null) {
operation = Object.assign({}, operation, {
type: this[kBatchType]
})
}

const length = this[kOperations].push(operation)

// Flush if we have a full batch
if (length >= this[kBatchSize]) {
const operations = this[kOperations].splice(0, length)
return this[kDb].batch(operations, this[kBatchOptions])
}
}

close () {
// Flush remainder if any, returning a promise
if (this[kOperations].length > 0) {
return this[kDb].batch(this[kOperations], this[kBatchOptions])
}
}
}

class BatchStream extends WritableStream {
constructor (db, options) {
let { highWaterMark, type, ...batchOptions } = options || {}

// Note there are two buffers. Unfortunately Web Streams have no _writev() equivalent
highWaterMark = highWaterMark || 500
type = type || 'put'

const sink = new LevelSink(db, highWaterMark, type, batchOptions)
const queueingStrategy = new CountQueuingStrategy({ highWaterMark })

super(sink, queueingStrategy)
}
}

exports.EntryStream = EntryStream
exports.KeyStream = KeyStream
exports.ValueStream = ValueStream
exports.BatchStream = BatchStream
6 changes: 4 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
{
"name": "level-web-stream",
"version": "1.0.0",
"description": "Read from an abstract-level database using Web Streams",
"description": "Read and write to an abstract-level database using Web Streams",
"license": "MIT",
"scripts": {
"test": "standard && ts-standard *.ts && hallmark && npm run test-browsers-local && (nyc -s node test.js | faucet) && nyc report",
"test": "npm run lint && npm run test-browsers-local && npm run test-node",
"test-node": "(nyc -s node test.js | faucet) && nyc report",
"test-browsers-local": "airtap --coverage test.js",
"lint": "standard && ts-standard *.ts && hallmark",
"coverage": "nyc report -r lcovonly"
},
"main": "index.js",
Expand Down
Loading

0 comments on commit 2b892a5

Please sign in to comment.