Skip to content

Commit a0aa3b6

Browse files
authored
Merge pull request #1 from leptos-rs/raskyld/migrate-integration-wasi
feat: migrate integration crate off-tree
2 parents e541169 + e3ee55d commit a0aa3b6

File tree

7 files changed

+1195
-0
lines changed

7 files changed

+1195
-0
lines changed

Cargo.toml

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
[package]
2+
name = "leptos_wasi"
3+
authors = ["Enzo Nocera"]
4+
license = "MIT"
5+
repository = "https://github.com/leptos-rs/leptos_wasi"
6+
description = "WASI integrations for the Leptos web framework."
7+
version = "0.0.0"
8+
edition = "2021"
9+
10+
[dependencies]
11+
any_spawner = { version = "0.1.1", features = ["futures-executor"] }
12+
throw_error = { version = "0.2.0-rc0" }
13+
hydration_context = { version = "0.2.0-rc0" }
14+
futures = "0.3.30"
15+
wasi = "0.13.1+wasi-0.2.0"
16+
leptos = { version = "0.7.0-rc0", features = ["nonce", "ssr"] }
17+
leptos_meta = { version = "0.7.0-rc0", features = ["ssr"] }
18+
leptos_router = { version = "0.7.0-rc0", features = ["ssr"] }
19+
leptos_macro = { version = "0.7.0-rc0", features = ["generic"] }
20+
leptos_integration_utils = {version = "0.7.0-rc0" }
21+
server_fn = { version = "0.7.0-rc0", features = ["generic"] }
22+
http = "1.1.0"
23+
parking_lot = "0.12.3"
24+
bytes = "1.7.2"
25+
routefinder = "0.5.4"
26+
mime_guess = "2.0"
27+
thiserror = "1.0.65"

src/executor.rs

+280
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
//! This is (Yet Another) Async Runtime for WASI with first-class support
2+
//! for `.await`-ing on [`Pollable`]. It is an ad-hoc implementation
3+
//! tailored for Leptos but it could be exported into a standalone crate.
4+
//!
5+
//! It is based on the `futures` crate's [`LocalPool`] and makes use of
6+
//! no `unsafe` code.
7+
//!
8+
//! # Performance Notes
9+
//!
10+
//! I haven't benchmarked this runtime but since it makes no use of unsafe code
11+
//! and Rust `core`'s `Context` was prematurely optimised for multi-threading
12+
//! environment, I had no choice but using synchronisation primitives to make
13+
//! the API happy.
14+
//!
15+
//! IIRC, `wasm32` targets have an implementation of synchronisation primitives
16+
//! that are just stubs, downgrading them to their single-threaded counterpart
17+
//! so the overhead should be minimal.
18+
//!
19+
//! Also, you can customise the behaviour of the [`Executor`] using the
20+
//! [`Mode`] enum to trade-off reactivity for less host context switch
21+
//! with the [`Mode::Stalled`] variant.
22+
23+
use std::{
24+
cell::RefCell,
25+
future::Future,
26+
mem,
27+
rc::Rc,
28+
sync::{Arc, OnceLock},
29+
task::{Context, Poll, Wake, Waker},
30+
};
31+
32+
use any_spawner::CustomExecutor;
33+
use futures::{
34+
channel::mpsc::{UnboundedReceiver, UnboundedSender},
35+
executor::{LocalPool, LocalSpawner},
36+
task::{LocalSpawnExt, SpawnExt},
37+
FutureExt, Stream,
38+
};
39+
use parking_lot::Mutex;
40+
use wasi::{
41+
clocks::monotonic_clock::{subscribe_duration, Duration},
42+
io::poll::{poll, Pollable},
43+
};
44+
45+
struct TableEntry(Pollable, Waker);
46+
47+
static POLLABLE_SINK: OnceLock<UnboundedSender<TableEntry>> = OnceLock::new();
48+
49+
pub async fn sleep(duration: Duration) {
50+
WaitPoll::new(subscribe_duration(duration)).await
51+
}
52+
53+
pub struct WaitPoll(WaitPollInner);
54+
55+
enum WaitPollInner {
56+
Unregistered(Pollable),
57+
Registered(Arc<WaitPollWaker>),
58+
}
59+
60+
impl WaitPoll {
61+
pub fn new(pollable: Pollable) -> Self {
62+
Self(WaitPollInner::Unregistered(pollable))
63+
}
64+
}
65+
66+
impl Future for WaitPoll {
67+
type Output = ();
68+
69+
fn poll(
70+
self: std::pin::Pin<&mut Self>,
71+
cx: &mut Context<'_>,
72+
) -> Poll<Self::Output> {
73+
match &mut self.get_mut().0 {
74+
this @ WaitPollInner::Unregistered(_) => {
75+
let waker = Arc::new(WaitPollWaker::new(cx.waker()));
76+
77+
if let Some(sender) = POLLABLE_SINK.get() {
78+
if let WaitPollInner::Unregistered(pollable) = mem::replace(
79+
this,
80+
WaitPollInner::Registered(waker.clone()),
81+
) {
82+
sender
83+
.clone()
84+
.unbounded_send(TableEntry(pollable, waker.into()))
85+
.expect("cannot spawn a new WaitPoll");
86+
87+
Poll::Pending
88+
} else {
89+
unreachable!();
90+
}
91+
} else {
92+
panic!(
93+
"cannot create a WaitPoll before creating an Executor"
94+
);
95+
}
96+
}
97+
WaitPollInner::Registered(waker) => {
98+
let mut lock = waker.0.lock();
99+
if lock.done {
100+
Poll::Ready(())
101+
} else {
102+
// How can it happen?! :O
103+
// Well, if, for some reason, the Task get woken up for
104+
// another reason than the pollable associated with this
105+
// WaitPoll got ready.
106+
//
107+
// We need to make sure we update the waker.
108+
lock.task_waker = cx.waker().clone();
109+
Poll::Pending
110+
}
111+
}
112+
}
113+
}
114+
}
115+
116+
struct WaitPollWaker(Mutex<WaitPollWakerInner>);
117+
118+
struct WaitPollWakerInner {
119+
done: bool,
120+
task_waker: Waker,
121+
}
122+
123+
impl WaitPollWaker {
124+
fn new(waker: &Waker) -> Self {
125+
Self(Mutex::new(WaitPollWakerInner {
126+
done: false,
127+
task_waker: waker.clone(),
128+
}))
129+
}
130+
}
131+
132+
impl Wake for WaitPollWaker {
133+
fn wake(self: std::sync::Arc<Self>) {
134+
self.wake_by_ref();
135+
}
136+
137+
fn wake_by_ref(self: &std::sync::Arc<Self>) {
138+
let mut lock = self.0.lock();
139+
lock.task_waker.wake_by_ref();
140+
lock.done = true;
141+
}
142+
}
143+
144+
/// Controls how often the [`Executor`] checks for [`Pollable`] readiness.
145+
pub enum Mode {
146+
/// Will check as often as possible for readiness, this have some
147+
/// performance overhead.
148+
Premptive,
149+
150+
/// Will only check for readiness when no more progress can be made
151+
/// on pooled Futures.
152+
Stalled,
153+
}
154+
155+
#[derive(Clone)]
156+
pub struct Executor(Rc<ExecutorInner>);
157+
158+
struct ExecutorInner {
159+
pool: RefCell<LocalPool>,
160+
spawner: LocalSpawner,
161+
rx: RefCell<UnboundedReceiver<TableEntry>>,
162+
mode: Mode,
163+
}
164+
165+
impl Executor {
166+
pub fn new(mode: Mode) -> Self {
167+
let pool = LocalPool::new();
168+
let spawner = pool.spawner();
169+
let (tx, rx) = futures::channel::mpsc::unbounded();
170+
171+
POLLABLE_SINK
172+
.set(tx.clone())
173+
.expect("calling Executor::new two times is not supported");
174+
175+
Self(Rc::new(ExecutorInner {
176+
pool: RefCell::new(pool),
177+
spawner,
178+
rx: RefCell::new(rx),
179+
mode,
180+
}))
181+
}
182+
183+
pub fn run_until<T>(&self, fut: T) -> T::Output
184+
where
185+
T: Future + 'static,
186+
{
187+
let (tx, mut rx) = futures::channel::oneshot::channel::<T::Output>();
188+
self.spawn_local(Box::pin(fut.then(|val| async move {
189+
if tx.send(val).is_err() {
190+
panic!("failed to send the return value of the future passed to run_until");
191+
}
192+
})));
193+
194+
loop {
195+
match rx.try_recv() {
196+
Err(_) => panic!(
197+
"internal error: sender of run until has been dropped"
198+
),
199+
Ok(Some(val)) => return val,
200+
Ok(None) => {
201+
self.poll_local();
202+
}
203+
}
204+
}
205+
}
206+
}
207+
208+
impl CustomExecutor for Executor {
209+
fn spawn(&self, fut: any_spawner::PinnedFuture<()>) {
210+
self.0.spawner.spawn(fut).unwrap();
211+
}
212+
213+
fn spawn_local(&self, fut: any_spawner::PinnedLocalFuture<()>) {
214+
self.0.spawner.spawn_local(fut).unwrap();
215+
}
216+
217+
fn poll_local(&self) {
218+
let mut pool = match self.0.pool.try_borrow_mut() {
219+
Ok(pool) => pool,
220+
// Nested call to poll_local(), noop.
221+
Err(_) => return,
222+
};
223+
224+
match self.0.mode {
225+
Mode::Premptive => {
226+
pool.try_run_one();
227+
}
228+
Mode::Stalled => pool.run_until_stalled(),
229+
};
230+
231+
let (lower, upper) = self.0.rx.borrow().size_hint();
232+
let capacity = upper.unwrap_or(lower);
233+
let mut entries = Vec::with_capacity(capacity);
234+
let mut rx = self.0.rx.borrow_mut();
235+
236+
loop {
237+
match rx.try_next() {
238+
Ok(None) => break,
239+
Ok(Some(entry)) => {
240+
entries.push(Some(entry));
241+
}
242+
Err(_) => break,
243+
}
244+
}
245+
246+
if entries.is_empty() {
247+
// This could happen if some Futures use Waker that are not
248+
// registered through [`WaitPoll`] or that we are blocked
249+
// because some Future returned `Poll::Pending` without
250+
// actually making sure their Waker is called at some point.
251+
return;
252+
}
253+
254+
let pollables = entries
255+
.iter()
256+
.map(|entry| &entry.as_ref().unwrap().0)
257+
.collect::<Vec<_>>();
258+
259+
let ready = poll(&pollables);
260+
261+
if let Some(sender) = POLLABLE_SINK.get() {
262+
let sender = sender.clone();
263+
264+
// Wakes futures subscribed to ready pollable.
265+
for index in ready {
266+
let wake = entries[index as usize].take().unwrap().1;
267+
wake.wake();
268+
}
269+
270+
// Requeue not ready pollable.
271+
for entry in entries.into_iter().flatten() {
272+
sender
273+
.unbounded_send(entry)
274+
.expect("the sender channel is closed");
275+
}
276+
} else {
277+
unreachable!();
278+
}
279+
}
280+
}

0 commit comments

Comments
 (0)