Skip to content

Commit 053bfc2

Browse files
committed
feat: add PyFuture to await Python awaitables
1 parent 199d4f5 commit 053bfc2

File tree

13 files changed

+686
-173
lines changed

13 files changed

+686
-173
lines changed

guide/src/SUMMARY.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
- [Mapping of Rust types to Python types](conversions/tables.md)
2525
- [Conversion traits](conversions/traits.md)
2626
- [Using `async` and `await`](async-await.md)
27+
- [Awaiting Python awaitables](async-await/pyfuture.md)
2728
- [Parallelism](parallelism.md)
2829
- [Debugging](debugging.md)
2930
- [Features reference](features.md)

guide/src/async-await/pyfuture.md

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
# Awaiting Python awaitables
2+
3+
Python awaitable can be awaited on Rust side using [`PyFuture`]({{#PYO3_DOCS_URL}}/pyo3/types/struct.PyFuture.html).
4+
5+
```rust
6+
# # ![allow(dead_code)]
7+
# #[cfg(feature = "experimental-async")] {
8+
use pyo3::{prelude::*, types::PyFuture};
9+
10+
#[pyfunction]
11+
async fn wrap_awaitable(awaitable: PyObject) -> PyResult<PyObject> {
12+
let future = Python::with_gil(|gil| PyFuture::from_unbound_object(gil, awaitable))?;
13+
future.await
14+
}
15+
# }
16+
```
17+
18+
`PyFuture` is constructed from a Python awaitablef by calling its `__await__` method
19+
(or `__iter__` for generator-based coroutine).
20+
21+
## Restrictions
22+
23+
`PyFuture` can only be awaited in the context of a PyO3 coroutine. Otherwise, it panics.
24+
25+
```rust
26+
# # ![allow(dead_code)]
27+
# #[cfg(feature = "experimental-async")] {
28+
use pyo3::{prelude::*, types::PyFuture};
29+
30+
#[pyfunction]
31+
fn block_on(awaitable: PyObject) -> PyResult<PyObject> {
32+
let future = Python::with_gil(|gil| PyFuture::from_unbound_object(gil, awaitable))?;
33+
futures::executor::block_on(future) // ERROR: PyFuture must be awaited in coroutine context
34+
}
35+
# }
36+
```
37+
38+
`PyFuture` must be the only Rust future awaited; it means that it's forbidden to `select!` a `Pyfuture`. Otherwise, it
39+
panics.
40+
41+
```rust
42+
# # ![allow(dead_code)]
43+
# #[cfg(feature = "experimental-async")] {
44+
use std::future;
45+
use futures::FutureExt;
46+
use pyo3::{prelude::*, types::PyFuture};
47+
48+
#[pyfunction]
49+
async fn select(awaitable: PyObject) -> PyResult<PyObject> {
50+
let future = Python::with_gil(|gil| PyFuture::from_unbound_object(gil, awaitable))?;
51+
futures::select_biased! {
52+
_ = future::pending::<()>().fuse() => unreachable!(),
53+
res = future.fuse() => res, // ERROR: Python awaitable mixed with Rust future
54+
}
55+
}
56+
# }
57+
```
58+
59+
These restrictions exist because awaiting a `PyFuture` strongly binds it to the enclosing coroutine. The coroutine will
60+
then delegate its `send`/`throw`/`close` methods to the awaited `PyFuture`. If it was awaited in
61+
a `select!`, `Coroutine::send` would no able to know if the value passed would have to be delegated to the `Pyfuture` or
62+
not.

newsfragments/3611.added.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add `PyFuture` to await Python awaitables

pyo3-ffi/src/abstract_.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,11 @@ extern "C" {
129129
pub fn PyIter_Next(arg1: *mut PyObject) -> *mut PyObject;
130130
#[cfg(all(not(PyPy), Py_3_10))]
131131
#[cfg_attr(PyPy, link_name = "PyPyIter_Send")]
132-
pub fn PyIter_Send(iter: *mut PyObject, arg: *mut PyObject, presult: *mut *mut PyObject);
132+
pub fn PyIter_Send(
133+
iter: *mut PyObject,
134+
arg: *mut PyObject,
135+
presult: *mut *mut PyObject,
136+
) -> c_int;
133137

134138
#[cfg_attr(PyPy, link_name = "PyPyNumber_Check")]
135139
pub fn PyNumber_Check(o: *mut PyObject) -> c_int;

src/coroutine.rs

Lines changed: 57 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,29 @@ use std::{
1111
use pyo3_macros::{pyclass, pymethods};
1212

1313
use crate::{
14-
coroutine::{cancel::ThrowCallback, waker::AsyncioWaker},
14+
coroutine::{cancel::ThrowCallback, waker::CoroutineWaker},
1515
exceptions::{PyAttributeError, PyRuntimeError, PyStopIteration},
1616
panic::PanicException,
17-
types::{string::PyStringMethods, PyIterator, PyString},
18-
Bound, IntoPy, Py, PyAny, PyErr, PyObject, PyResult, Python,
17+
types::{string::PyStringMethods, PyString},
18+
IntoPy, Py, PyErr, PyObject, PyResult, Python,
1919
};
2020

21+
mod asyncio;
2122
pub(crate) mod cancel;
22-
mod waker;
23+
pub(crate) mod waker;
2324

24-
use crate::marker::Ungil;
2525
pub use cancel::CancelHandle;
2626

27+
use crate::{exceptions::PyGeneratorExit, marker::Ungil};
28+
2729
const COROUTINE_REUSED_ERROR: &str = "cannot reuse already awaited coroutine";
2830

31+
pub(crate) enum CoroOp {
32+
Send(PyObject),
33+
Throw(PyObject),
34+
Close,
35+
}
36+
2937
trait CoroutineFuture: Send {
3038
fn poll(self: Pin<&mut Self>, py: Python<'_>, waker: &Waker) -> Poll<PyResult<PyObject>>;
3139
}
@@ -69,7 +77,7 @@ pub struct Coroutine {
6977
qualname_prefix: Option<&'static str>,
7078
throw_callback: Option<ThrowCallback>,
7179
future: Option<Pin<Box<dyn CoroutineFuture>>>,
72-
waker: Option<Arc<AsyncioWaker>>,
80+
waker: Option<Arc<CoroutineWaker>>,
7381
}
7482

7583
impl Coroutine {
@@ -104,58 +112,55 @@ impl Coroutine {
104112
}
105113
}
106114

107-
fn poll(&mut self, py: Python<'_>, throw: Option<PyObject>) -> PyResult<PyObject> {
115+
fn poll_inner(&mut self, py: Python<'_>, mut op: CoroOp) -> PyResult<PyObject> {
108116
// raise if the coroutine has already been run to completion
109117
let future_rs = match self.future {
110118
Some(ref mut fut) => fut,
111119
None => return Err(PyRuntimeError::new_err(COROUTINE_REUSED_ERROR)),
112120
};
113-
// reraise thrown exception it
114-
match (throw, &self.throw_callback) {
115-
(Some(exc), Some(cb)) => cb.throw(exc),
116-
(Some(exc), None) => {
117-
self.close();
118-
return Err(PyErr::from_value_bound(exc.into_bound(py)));
119-
}
120-
(None, _) => {}
121+
// if the future is not pending on a Python awaitable,
122+
// execute throw callback or complete on close
123+
if !matches!(self.waker, Some(ref w) if w.is_delegated(py)) {
124+
match op {
125+
send @ CoroOp::Send(_) => op = send,
126+
CoroOp::Throw(exc) => match &self.throw_callback {
127+
Some(cb) => {
128+
cb.throw(exc.clone_ref(py));
129+
op = CoroOp::Send(py.None());
130+
}
131+
None => return Err(PyErr::from_value_bound(exc.into_bound(py))),
132+
},
133+
CoroOp::Close => return Err(PyGeneratorExit::new_err(py.None())),
134+
};
121135
}
122136
// create a new waker, or try to reset it in place
123137
if let Some(waker) = self.waker.as_mut().and_then(Arc::get_mut) {
124-
waker.reset();
138+
waker.reset(op);
125139
} else {
126-
self.waker = Some(Arc::new(AsyncioWaker::new()));
140+
self.waker = Some(Arc::new(CoroutineWaker::new(op)));
127141
}
128-
// poll the future and forward its results if ready
142+
// poll the future and forward its results if ready; otherwise, yield from waker
129143
// polling is UnwindSafe because the future is dropped in case of panic
130144
let waker = Waker::from(self.waker.clone().unwrap());
131145
let poll = || future_rs.as_mut().poll(py, &waker);
132146
match panic::catch_unwind(panic::AssertUnwindSafe(poll)) {
133-
Ok(Poll::Ready(res)) => {
134-
self.close();
135-
return Err(PyStopIteration::new_err(res?));
136-
}
137-
Err(err) => {
138-
self.close();
139-
return Err(PanicException::from_panic_payload(err));
140-
}
141-
_ => {}
147+
Err(err) => Err(PanicException::from_panic_payload(err)),
148+
Ok(Poll::Ready(res)) => Err(PyStopIteration::new_err(res?)),
149+
Ok(Poll::Pending) => match self.waker.as_ref().unwrap().yield_(py) {
150+
Ok(to_yield) => Ok(to_yield),
151+
Err(err) => Err(err),
152+
},
142153
}
143-
// otherwise, initialize the waker `asyncio.Future`
144-
if let Some(future) = self.waker.as_ref().unwrap().initialize_future(py)? {
145-
// `asyncio.Future` must be awaited; fortunately, it implements `__iter__ = __await__`
146-
// and will yield itself if its result has not been set in polling above
147-
if let Some(future) = PyIterator::from_bound_object(&future.as_borrowed())
148-
.unwrap()
149-
.next()
150-
{
151-
// future has not been leaked into Python for now, and Rust code can only call
152-
// `set_result(None)` in `Wake` implementation, so it's safe to unwrap
153-
return Ok(future.unwrap().into());
154-
}
154+
}
155+
156+
fn poll(&mut self, py: Python<'_>, op: CoroOp) -> PyResult<PyObject> {
157+
let result = self.poll_inner(py, op);
158+
if result.is_err() {
159+
// the Rust future is dropped, and the field set to `None`
160+
// to indicate the coroutine has been run to completion
161+
drop(self.future.take());
155162
}
156-
// if waker has been waken during future polling, this is roughly equivalent to
157-
// `await asyncio.sleep(0)`, so just yield `None`.
158-
Ok(py.None().into_py(py))
163+
result
159164
}
160165
}
161166

@@ -180,25 +185,27 @@ impl Coroutine {
180185
}
181186
}
182187

183-
fn send(&mut self, py: Python<'_>, _value: &Bound<'_, PyAny>) -> PyResult<PyObject> {
184-
self.poll(py, None)
188+
fn send(&mut self, py: Python<'_>, value: PyObject) -> PyResult<PyObject> {
189+
self.poll(py, CoroOp::Send(value))
185190
}
186191

187192
fn throw(&mut self, py: Python<'_>, exc: PyObject) -> PyResult<PyObject> {
188-
self.poll(py, Some(exc))
193+
self.poll(py, CoroOp::Throw(exc))
189194
}
190195

191-
fn close(&mut self) {
192-
// the Rust future is dropped, and the field set to `None`
193-
// to indicate the coroutine has been run to completion
194-
drop(self.future.take());
196+
fn close(&mut self, py: Python<'_>) -> PyResult<()> {
197+
match self.poll(py, CoroOp::Close) {
198+
Ok(_) => Ok(()),
199+
Err(err) if err.is_instance_of::<PyGeneratorExit>(py) => Ok(()),
200+
Err(err) => Err(err),
201+
}
195202
}
196203

197204
fn __await__(self_: Py<Self>) -> Py<Self> {
198205
self_
199206
}
200207

201208
fn __next__(&mut self, py: Python<'_>) -> PyResult<PyObject> {
202-
self.poll(py, None)
209+
self.poll(py, CoroOp::Send(py.None()))
203210
}
204211
}

src/coroutine/asyncio.rs

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
//! Coroutine implementation compatible with asyncio.
2+
use pyo3_macros::pyfunction;
3+
4+
use crate::{
5+
intern,
6+
sync::GILOnceCell,
7+
types::{PyAnyMethods, PyCFunction, PyIterator},
8+
wrap_pyfunction_bound, Bound, IntoPy, Py, PyAny, PyObject, PyResult, Python,
9+
};
10+
11+
/// `asyncio.get_running_loop`
12+
fn get_running_loop(py: Python<'_>) -> PyResult<Bound<'_, PyAny>> {
13+
static GET_RUNNING_LOOP: GILOnceCell<PyObject> = GILOnceCell::new();
14+
let import = || -> PyResult<_> {
15+
let module = py.import_bound("asyncio")?;
16+
Ok(module.getattr("get_running_loop")?.into())
17+
};
18+
GET_RUNNING_LOOP
19+
.get_or_try_init(py, import)?
20+
.bind(py)
21+
.call0()
22+
}
23+
24+
/// Asyncio-compatible coroutine waker.
25+
///
26+
/// Polling a Rust future yields an `asyncio.Future`, whose `set_result` method is called
27+
/// when `Waker::wake` is called.
28+
pub(super) struct AsyncioWaker {
29+
event_loop: PyObject,
30+
future: PyObject,
31+
}
32+
33+
impl AsyncioWaker {
34+
pub(super) fn new(py: Python<'_>) -> PyResult<Self> {
35+
let event_loop = get_running_loop(py)?.into_py(py);
36+
let future = event_loop.call_method0(py, "create_future")?;
37+
Ok(Self { event_loop, future })
38+
}
39+
40+
pub(super) fn yield_(&self, py: Python<'_>) -> PyResult<PyObject> {
41+
let __await__;
42+
// `asyncio.Future` must be awaited; in normal case, it implements `__iter__ = __await__`,
43+
// but `create_future` may have been overriden
44+
let mut iter = match PyIterator::from_bound_object(self.future.bind(py)) {
45+
Ok(iter) => iter,
46+
Err(_) => {
47+
__await__ = self.future.call_method0(py, intern!(py, "__await__"))?;
48+
PyIterator::from_bound_object(__await__.bind(py))?
49+
}
50+
};
51+
// future has not been wakened (because `yield_waken` would have been called
52+
// otherwise), so it is expected to yield itself
53+
Ok(iter.next().expect("future didn't yield")?.into_py(py))
54+
}
55+
56+
#[allow(clippy::unnecessary_wraps)]
57+
pub(super) fn yield_waken(py: Python<'_>) -> PyResult<PyObject> {
58+
Ok(py.None().into())
59+
}
60+
61+
pub(super) fn wake(&self, py: Python<'_>) -> PyResult<()> {
62+
static RELEASE_WAITER: GILOnceCell<Py<PyCFunction>> = GILOnceCell::new();
63+
let release_waiter = RELEASE_WAITER.get_or_try_init(py, || {
64+
wrap_pyfunction_bound!(release_waiter, py).map(Into::into)
65+
})?;
66+
// `Future.set_result` must be called in event loop thread,
67+
// so it requires `call_soon_threadsafe`
68+
let call_soon_threadsafe = self.event_loop.call_method1(
69+
py,
70+
intern!(py, "call_soon_threadsafe"),
71+
(release_waiter, &self.future),
72+
);
73+
if let Err(err) = call_soon_threadsafe {
74+
// `call_soon_threadsafe` will raise if the event loop is closed;
75+
// instead of catching an unspecific `RuntimeError`, check directly if it's closed.
76+
let is_closed = self.event_loop.call_method0(py, "is_closed")?;
77+
if !is_closed.extract(py)? {
78+
return Err(err);
79+
}
80+
}
81+
Ok(())
82+
}
83+
}
84+
85+
/// Call `future.set_result` if the future is not done.
86+
///
87+
/// Future can be cancelled by the event loop before being wakened.
88+
/// See <https://github.com/python/cpython/blob/main/Lib/asyncio/tasks.py#L452C5-L452C5>
89+
#[pyfunction(crate = "crate")]
90+
fn release_waiter(future: &Bound<'_, PyAny>) -> PyResult<()> {
91+
let done = future.call_method0(intern!(future.py(), "done"))?;
92+
if !done.extract::<bool>()? {
93+
future.call_method1(intern!(future.py(), "set_result"), (future.py().None(),))?;
94+
}
95+
Ok(())
96+
}

0 commit comments

Comments
 (0)