diff --git a/crates/misc/component-async-tests/wit/test.wit b/crates/misc/component-async-tests/wit/test.wit index fce502e449..9519fc0854 100644 --- a/crates/misc/component-async-tests/wit/test.wit +++ b/crates/misc/component-async-tests/wit/test.wit @@ -63,8 +63,10 @@ interface backpressure { interface transmit { variant control { read-stream(string), + read-stream-zero, read-future(string), write-stream(string), + write-stream-zero, write-future(string), } diff --git a/crates/test-programs/src/bin/async_transmit_callee.rs b/crates/test-programs/src/bin/async_transmit_callee.rs index 928c17f131..8e831e0583 100644 --- a/crates/test-programs/src/bin/async_transmit_callee.rs +++ b/crates/test-programs/src/bin/async_transmit_callee.rs @@ -19,7 +19,7 @@ use { wit_future, wit_stream, }, std::future::IntoFuture, - wit_bindgen_rt::async_support::{self, FutureReader, StreamReader}, + wit_bindgen_rt::async_support::{self, FutureReader, StreamReader, StreamResult}, }; struct Component; @@ -48,6 +48,12 @@ impl Guest for Component { Control::ReadStream(value) => { assert_eq!(caller_stream_rx.next().await, Some(value)); } + Control::ReadStreamZero => { + assert_eq!( + caller_stream_rx.read(Vec::new()).await.0, + StreamResult::Complete(0) + ); + } Control::ReadFuture(value) => { assert_eq!( caller_future_rx1.take().unwrap().into_future().await, @@ -57,6 +63,12 @@ impl Guest for Component { Control::WriteStream(value) => { assert!(callee_stream_tx.write_one(value).await.is_none()); } + Control::WriteStreamZero => { + assert_eq!( + callee_stream_tx.write(Vec::new()).await.0, + StreamResult::Complete(0) + ); + } Control::WriteFuture(value) => { callee_future_tx1 .take() diff --git a/crates/test-programs/src/bin/async_transmit_caller.rs b/crates/test-programs/src/bin/async_transmit_caller.rs index bdf054b198..833c57d1de 100644 --- a/crates/test-programs/src/bin/async_transmit_caller.rs +++ b/crates/test-programs/src/bin/async_transmit_caller.rs @@ -22,13 +22,13 @@ use { local::local::transmit::{self, Control}, wit_future, wit_stream, }, - futures::{future, FutureExt}, + futures::{future, stream::FuturesUnordered, FutureExt, StreamExt}, std::{ future::{Future, IntoFuture}, - pin::pin, + pin::{pin, Pin}, task::Poll, }, - wit_bindgen_rt::async_support::FutureWriteCancel, + wit_bindgen_rt::async_support::{FutureWriteCancel, StreamResult}, }; struct Component; @@ -69,6 +69,38 @@ impl Guest for Component { .is_none()); assert!(caller_stream_tx.write_one("c".into()).await.is_none()); + // Tell the peer to do a zero-length read, do a zero-length write; assert the latter completes, then do a + // non-zero-length write, assert that it does _not_ complete, then tell the peer to do a non-zero-length + // read and assert that the write completes. + assert!(control_tx + .write_one(Control::ReadStreamZero) + .await + .is_none()); + { + assert_eq!( + caller_stream_tx.write(Vec::new()).await.0, + StreamResult::Complete(0) + ); + + let send = Box::pin(caller_stream_tx.write_one("d".into())); + let Err(send) = poll(send).await else { + panic!() + }; + + let mut futures = FuturesUnordered::new(); + futures.push(Box::pin(send.map(|v| { + assert!(v.is_none()); + })) as Pin>>); + futures.push(Box::pin( + control_tx + .write_one(Control::ReadStream("d".into())) + .map(|v| { + assert!(v.is_none()); + }), + )); + while let Some(()) = futures.next().await {} + } + // Start writing a value to the future, but cancel the write before telling the peer to read. { let send = Box::pin(caller_future_tx1.write("x".into())); @@ -114,6 +146,32 @@ impl Guest for Component { .is_none()); assert_eq!(callee_stream_rx.next().await, Some("b".into())); + // Tell the peer to do a zero-length write, assert that the read does _not_ complete, then tell the peer to + // do a non-zero-length write and assert that the read completes. + assert!(control_tx + .write_one(Control::WriteStreamZero) + .await + .is_none()); + { + let next = Box::pin(callee_stream_rx.next()); + let Err(next) = poll(next).await else { + panic!() + }; + + let mut futures = FuturesUnordered::new(); + futures.push(Box::pin(next.map(|v| { + assert_eq!(v, Some("c".into())); + })) as Pin>>); + futures.push(Box::pin( + control_tx + .write_one(Control::WriteStream("c".into())) + .map(|v| { + assert!(v.is_none()); + }), + )); + while let Some(()) = futures.next().await {} + } + // Start reading a value from the future, but cancel the read before telling the peer to write. { let next = Box::pin(callee_future_rx1.into_future()); diff --git a/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs b/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs index f9b5c75780..a3d5fb56d4 100644 --- a/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs +++ b/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs @@ -2231,6 +2231,21 @@ impl ComponentInstance { ReadState::Open }; + let set_guest_ready = |me: &mut Self| { + let transmit = me.get_mut(transmit_id)?; + assert!(matches!(&transmit.write, WriteState::Open)); + transmit.write = WriteState::GuestReady { + ty, + flat_abi, + options, + address: usize::try_from(address).unwrap(), + count: usize::try_from(count).unwrap(), + handle, + post_write: PostWrite::Continue, + }; + Ok::<_, crate::Error>(()) + }; + let result = match mem::replace(&mut transmit.read, new_state) { ReadState::GuestReady { ty: read_ty, @@ -2239,10 +2254,12 @@ impl ComponentInstance { address: read_address, count: read_count, handle: read_handle, - .. } => { assert_eq!(flat_abi, read_flat_abi); + let write_complete = count == 0 || read_count > 0; + let read_complete = count > 0; + let read_handle_rep = transmit.read_handle.rep(); let count = count.min(read_count); @@ -2261,7 +2278,8 @@ impl ComponentInstance { )?; let code = ReturnCode::Completed(count.try_into().unwrap()); - { + + if read_complete { self.push_event( read_handle_rep, match read_ty { @@ -2277,9 +2295,24 @@ impl ComponentInstance { }, }, )?; + } else { + let transmit = self.get_mut(transmit_id)?; + transmit.read = ReadState::GuestReady { + ty: read_ty, + flat_abi: read_flat_abi, + options: read_options, + address: read_address, + count: read_count, + handle: read_handle, + }; } - code + if write_complete { + code + } else { + set_guest_ready(self)?; + ReturnCode::Blocked + } } ReadState::HostReady { accept } => { @@ -2295,19 +2328,7 @@ impl ComponentInstance { } ReadState::Open => { - assert!(matches!(&transmit.write, WriteState::Open)); - - let transmit = self.get_mut(transmit_id)?; - transmit.write = WriteState::GuestReady { - ty, - flat_abi, - options, - address: usize::try_from(address).unwrap(), - count: usize::try_from(count).unwrap(), - handle, - post_write: PostWrite::Continue, - }; - + set_guest_ready(self)?; ReturnCode::Blocked } @@ -2370,6 +2391,20 @@ impl ComponentInstance { WriteState::Open }; + let set_guest_ready = |me: &mut Self| { + let transmit = me.get_mut(transmit_id)?; + assert!(matches!(&transmit.read, ReadState::Open)); + transmit.read = ReadState::GuestReady { + ty, + flat_abi, + options, + address: usize::try_from(address).unwrap(), + count: usize::try_from(count).unwrap(), + handle, + }; + Ok::<_, crate::Error>(()) + }; + let result = match mem::replace(&mut transmit.write, new_state) { WriteState::GuestReady { ty: write_ty, @@ -2384,6 +2419,9 @@ impl ComponentInstance { let write_handle_rep = transmit.write_handle.rep(); + let write_complete = write_count == 0 || count > 0; + let read_complete = write_count > 0; + let count = usize::try_from(count).unwrap().min(write_count); self.copy( @@ -2407,7 +2445,8 @@ impl ComponentInstance { }; let code = ReturnCode::Completed(count.try_into().unwrap()); - { + + if write_complete { self.push_event( write_handle_rep, match write_ty { @@ -2421,9 +2460,25 @@ impl ComponentInstance { }, }, )?; + } else { + let transmit = self.get_mut(transmit_id)?; + transmit.write = WriteState::GuestReady { + ty: write_ty, + flat_abi: write_flat_abi, + options: write_options, + address: write_address, + count: write_count, + handle: write_handle, + post_write, + }; } - code + if read_complete { + code + } else { + set_guest_ready(self)?; + ReturnCode::Blocked + } } WriteState::HostReady { accept, post_write } => { @@ -2445,18 +2500,7 @@ impl ComponentInstance { } WriteState::Open => { - assert!(matches!(&transmit.read, ReadState::Open)); - - let transmit = self.get_mut(transmit_id)?; - transmit.read = ReadState::GuestReady { - ty, - flat_abi, - options, - address: usize::try_from(address).unwrap(), - count: usize::try_from(count).unwrap(), - handle, - }; - + set_guest_ready(self)?; ReturnCode::Blocked }