Skip to content

Commit

Permalink
feat: fix support for stream.Readable in browsers
Browse files Browse the repository at this point in the history
- Since wasm-bindgen doesn't support a way to extend JavaScript
classes (rustwasm/wasm-bindgen#210), add a basic JavaScript shim, that
pushes the data available in Rust to the Readable in JavaScript.

- use channels and spawn_local in Rust to move along Stream iterator
and pass results to enqueue for read() (in the JavaScript shim)

- adds an abort controller that will abort the stream in JavaScript
is a stream error occurs in Rust
  • Loading branch information
enmand committed Apr 8, 2024
1 parent 64f45f5 commit ffc01ae
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 21 deletions.
16 changes: 16 additions & 0 deletions crates/dwn-rs-wasm/src/streams/readable.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Provide a JavaScript shim for the WAS-based ReadableStream, so we can use it in
// the browser and we can use the ES6-style classes

const { Readable } = require("readable-stream");

const makeReadable = (readFn, abort) => {
return new Readable({
read(size) {
this.push(readFn(size));
},

signal: abort,
});
};

module.exports = { makeReadable };
53 changes: 32 additions & 21 deletions crates/dwn-rs-wasm/src/streams/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ use futures_util::{pin_mut, StreamExt};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
use tokio_stream::Stream;
use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::spawn_local;
use web_sys::AbortController;

use crate::streams::sys::EventEmitter;
use crate::streams::sys::make_readable;

use super::sys::Readable;

Expand Down Expand Up @@ -50,33 +52,42 @@ impl StreamReadable {
where
St: Stream<Item = Result<JsValue, JsValue>>,
{
// TODO: this is an extremely "hacky" implementation, that uses a legacy trait of
// streams in Node, and wraps an EventEmitter, emitting data from the Rust Stream,
// using the Readable.wrap, turning it into a proper Node ReadableStream. Once (if)
// dwn-sdk-js is on Web Streams, we can remove this (or someone can find a better way).
let ee = EventEmitter::new();
let readable = Readable::new().wrap(JsCast::unchecked_into::<Readable>(ee.clone()));
readable.resume();

pin_mut!(stream);
let (data_tx, mut data_rx) = unbounded_channel::<JsValue>();
let controller = AbortController::new().unwrap();

while let Some(item) = stream.next().await {
let item = match item {
Ok(i) => i,
Err(e) => {
if e.is_null() {
ee.emit("end", JsValue::NULL);
} else {
ee.emit("error", e);
let data_tx = data_tx.clone();
let controller = controller.clone();
spawn_local(async move {
match item {
Ok(i) => {
data_tx.send(i).unwrap();
}
Err(e) => {
if e.is_null() {
data_tx.send(JsValue::NULL).unwrap();
} else {
controller.abort();
}
}
return Self::new(readable);
}
};

ee.emit("data", item.clone());
});
}

Self::new(readable)
let newr = make_readable(
// TODO: the closure should take a `size` argument, and properly buffer the data
Closure::wrap(Box::new(move |_size| -> JsValue {
match data_rx.blocking_recv() {
Some(d) => d,
None => JsValue::NULL,
}
}) as Box<dyn FnMut(JsValue) -> JsValue>)
.into_js_value(),
controller.signal(),
);

Self::new(newr)
}

/// into_stream creates a new Stream from the StreamReadable stream. This function locks the StreamReadable in
Expand Down
7 changes: 7 additions & 0 deletions crates/dwn-rs-wasm/src/streams/sys.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use js_sys::{AsyncIterator, Function, Iterator, Object};
use wasm_bindgen::prelude::*;
use web_sys::AbortSignal;

#[wasm_bindgen(module = "events")]
extern "C" {
Expand Down Expand Up @@ -142,3 +143,9 @@ extern "C" {
#[derive(Debug, Clone)]
pub type PassThrough;
}

#[wasm_bindgen(module = "/src/streams/readable.js")]
extern "C" {
#[wasm_bindgen(js_name = makeReadable)]
pub fn make_readable(write: JsValue, abort: AbortSignal) -> Readable;
}

0 comments on commit ffc01ae

Please sign in to comment.