Skip to content
This repository was archived by the owner on Sep 8, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/misc/component-async-tests/wit/test.wit
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@ interface backpressure {
interface transmit {
variant control {
read-stream(string),
read-stream-zero,
read-future(string),
write-stream(string),
write-stream-zero,
write-future(string),
}

Expand Down
14 changes: 13 additions & 1 deletion crates/test-programs/src/bin/async_transmit_callee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use {
wit_future, wit_stream,
},
std::future::IntoFuture,
wit_bindgen_rt::async_support::{self, FutureReader, StreamReader},
wit_bindgen_rt::async_support::{self, FutureReader, StreamReader, StreamResult},
};

struct Component;
Expand Down Expand Up @@ -48,6 +48,12 @@ impl Guest for Component {
Control::ReadStream(value) => {
assert_eq!(caller_stream_rx.next().await, Some(value));
}
Control::ReadStreamZero => {
assert_eq!(
caller_stream_rx.read(Vec::new()).await.0,
StreamResult::Complete(0)
);
}
Control::ReadFuture(value) => {
assert_eq!(
caller_future_rx1.take().unwrap().into_future().await,
Expand All @@ -57,6 +63,12 @@ impl Guest for Component {
Control::WriteStream(value) => {
assert!(callee_stream_tx.write_one(value).await.is_none());
}
Control::WriteStreamZero => {
assert_eq!(
callee_stream_tx.write(Vec::new()).await.0,
StreamResult::Complete(0)
);
}
Control::WriteFuture(value) => {
callee_future_tx1
.take()
Expand Down
64 changes: 61 additions & 3 deletions crates/test-programs/src/bin/async_transmit_caller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ use {
local::local::transmit::{self, Control},
wit_future, wit_stream,
},
futures::{future, FutureExt},
futures::{future, stream::FuturesUnordered, FutureExt, StreamExt},
std::{
future::{Future, IntoFuture},
pin::pin,
pin::{pin, Pin},
task::Poll,
},
wit_bindgen_rt::async_support::FutureWriteCancel,
wit_bindgen_rt::async_support::{FutureWriteCancel, StreamResult},
};

struct Component;
Expand Down Expand Up @@ -69,6 +69,38 @@ impl Guest for Component {
.is_none());
assert!(caller_stream_tx.write_one("c".into()).await.is_none());

// Tell the peer to do a zero-length read, do a zero-length write; assert the latter completes, then do a
// non-zero-length write, assert that it does _not_ complete, then tell the peer to do a non-zero-length
// read and assert that the write completes.
assert!(control_tx
.write_one(Control::ReadStreamZero)
.await
.is_none());
{
assert_eq!(
caller_stream_tx.write(Vec::new()).await.0,
StreamResult::Complete(0)
);

let send = Box::pin(caller_stream_tx.write_one("d".into()));
let Err(send) = poll(send).await else {
panic!()
};

let mut futures = FuturesUnordered::new();
futures.push(Box::pin(send.map(|v| {
assert!(v.is_none());
})) as Pin<Box<dyn Future<Output = _>>>);
futures.push(Box::pin(
control_tx
.write_one(Control::ReadStream("d".into()))
.map(|v| {
assert!(v.is_none());
}),
));
while let Some(()) = futures.next().await {}
}

// Start writing a value to the future, but cancel the write before telling the peer to read.
{
let send = Box::pin(caller_future_tx1.write("x".into()));
Expand Down Expand Up @@ -114,6 +146,32 @@ impl Guest for Component {
.is_none());
assert_eq!(callee_stream_rx.next().await, Some("b".into()));

// Tell the peer to do a zero-length write, assert that the read does _not_ complete, then tell the peer to
// do a non-zero-length write and assert that the read completes.
assert!(control_tx
.write_one(Control::WriteStreamZero)
.await
.is_none());
{
let next = Box::pin(callee_stream_rx.next());
let Err(next) = poll(next).await else {
panic!()
};

let mut futures = FuturesUnordered::new();
futures.push(Box::pin(next.map(|v| {
assert_eq!(v, Some("c".into()));
})) as Pin<Box<dyn Future<Output = _>>>);
futures.push(Box::pin(
control_tx
.write_one(Control::WriteStream("c".into()))
.map(|v| {
assert!(v.is_none());
}),
));
while let Some(()) = futures.next().await {}
}

// Start reading a value from the future, but cancel the read before telling the peer to write.
{
let next = Box::pin(callee_future_rx1.into_future());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2231,6 +2231,21 @@ impl ComponentInstance {
ReadState::Open
};

let set_guest_ready = |me: &mut Self| {
let transmit = me.get_mut(transmit_id)?;
assert!(matches!(&transmit.write, WriteState::Open));
transmit.write = WriteState::GuestReady {
ty,
flat_abi,
options,
address: usize::try_from(address).unwrap(),
count: usize::try_from(count).unwrap(),
handle,
post_write: PostWrite::Continue,
};
Ok::<_, crate::Error>(())
};

let result = match mem::replace(&mut transmit.read, new_state) {
ReadState::GuestReady {
ty: read_ty,
Expand All @@ -2239,10 +2254,12 @@ impl ComponentInstance {
address: read_address,
count: read_count,
handle: read_handle,
..
} => {
assert_eq!(flat_abi, read_flat_abi);

let write_complete = count == 0 || read_count > 0;
let read_complete = count > 0;

let read_handle_rep = transmit.read_handle.rep();

let count = count.min(read_count);
Expand All @@ -2261,7 +2278,8 @@ impl ComponentInstance {
)?;

let code = ReturnCode::Completed(count.try_into().unwrap());
{

if read_complete {
self.push_event(
read_handle_rep,
match read_ty {
Expand All @@ -2277,9 +2295,24 @@ impl ComponentInstance {
},
},
)?;
} else {
let transmit = self.get_mut(transmit_id)?;
transmit.read = ReadState::GuestReady {
ty: read_ty,
flat_abi: read_flat_abi,
options: read_options,
address: read_address,
count: read_count,
handle: read_handle,
};
}

code
if write_complete {
code
} else {
set_guest_ready(self)?;
ReturnCode::Blocked
}
}

ReadState::HostReady { accept } => {
Expand All @@ -2295,19 +2328,7 @@ impl ComponentInstance {
}

ReadState::Open => {
assert!(matches!(&transmit.write, WriteState::Open));

let transmit = self.get_mut(transmit_id)?;
transmit.write = WriteState::GuestReady {
ty,
flat_abi,
options,
address: usize::try_from(address).unwrap(),
count: usize::try_from(count).unwrap(),
handle,
post_write: PostWrite::Continue,
};

set_guest_ready(self)?;
ReturnCode::Blocked
}

Expand Down Expand Up @@ -2370,6 +2391,20 @@ impl ComponentInstance {
WriteState::Open
};

let set_guest_ready = |me: &mut Self| {
let transmit = me.get_mut(transmit_id)?;
assert!(matches!(&transmit.read, ReadState::Open));
transmit.read = ReadState::GuestReady {
ty,
flat_abi,
options,
address: usize::try_from(address).unwrap(),
count: usize::try_from(count).unwrap(),
handle,
};
Ok::<_, crate::Error>(())
};

let result = match mem::replace(&mut transmit.write, new_state) {
WriteState::GuestReady {
ty: write_ty,
Expand All @@ -2384,6 +2419,9 @@ impl ComponentInstance {

let write_handle_rep = transmit.write_handle.rep();

let write_complete = write_count == 0 || count > 0;
let read_complete = write_count > 0;

let count = usize::try_from(count).unwrap().min(write_count);

self.copy(
Expand All @@ -2407,7 +2445,8 @@ impl ComponentInstance {
};

let code = ReturnCode::Completed(count.try_into().unwrap());
{

if write_complete {
self.push_event(
write_handle_rep,
match write_ty {
Expand All @@ -2421,9 +2460,25 @@ impl ComponentInstance {
},
},
)?;
} else {
let transmit = self.get_mut(transmit_id)?;
transmit.write = WriteState::GuestReady {
ty: write_ty,
flat_abi: write_flat_abi,
options: write_options,
address: write_address,
count: write_count,
handle: write_handle,
post_write,
};
}

code
if read_complete {
code
} else {
set_guest_ready(self)?;
ReturnCode::Blocked
}
}

WriteState::HostReady { accept, post_write } => {
Expand All @@ -2445,18 +2500,7 @@ impl ComponentInstance {
}

WriteState::Open => {
assert!(matches!(&transmit.read, ReadState::Open));

let transmit = self.get_mut(transmit_id)?;
transmit.read = ReadState::GuestReady {
ty,
flat_abi,
options,
address: usize::try_from(address).unwrap(),
count: usize::try_from(count).unwrap(),
handle,
};

set_guest_ready(self)?;
ReturnCode::Blocked
}

Expand Down