Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use anyio to support an idiomatic async interface #558

Open
1 task done
Zac-HD opened this issue Apr 18, 2024 · 0 comments
Open
1 task done

Use anyio to support an idiomatic async interface #558

Zac-HD opened this issue Apr 18, 2024 · 0 comments
Labels
feature-request A feature should be added or improved. p3 This is a minor priority issue

Comments

@Zac-HD
Copy link

Zac-HD commented Apr 18, 2024

Describe the feature

Reading that "All network operations in awscrt.http are asynchronous." caused me a brief moment of confusion, because the modern idiom for asynchronous programming in Python uses async/await rather than raw futures.

More specifically, the community is moving towards "structured concurrency" - with the Trio framework, new features in the stdlib asyncio module, and the anyio library (which allows libraries to work with user's-choice of Trio or asyncio).

Use Case

We're increasingly using async Python with Trio at work for all our dev tooling, and so I'd love to see aws-crt-python to support syntatic asynchrony on top of the current return-a-future design. This would also be useful for libraries such as aio-libs/aiobotocore#1106.

Proposed Solution

Looking through the docs, I see three cases to be made awaitable: futures, events, and websockets.

The simple cases

Happily, it's trivial to support a very efficient wrapper for Futures:

import anyio
import concurrent.futures

class WaitableFuture(concurrent.futures.Future):
    async def wait(self):
        evt = anyio.Event()
        self.add_done_callback(lambda _: evt.set())
        await evt.wait()
        return self.result(timeout=0)

# Users do a simple syntax-level transformation, e.g.:
client = awscrt.http.HttpClientConnection.new().result()  # current sync api
client = await awscrt.http.HttpClientConnection.new().wait()  # proposed

While it's always possible to build this kind of wrapper downstream, it'd be nice to do it once upstream and have that work for everyone.

An awaitable version of threading.Event gives us the same pleasant user experience, at the cost of a gnarlier implementation: either we accept some pointless latency from a polling implementation, or the complexity of managing a 'waiter' thread:

# Again, users do a simple syntax-level transformation, e.g.:
loop = awscrt.io.EventLoopGroup()
loop.shutdown_event.wait()  # current sync api
await loop.shutdown_event.wait_async()  # either option below

# ---- implementation ----
import anyio
import threading
import weakref

class Event:
    def __init__(self, event: threading.Event, /) -> None:
        self.__event = event

    def wait_sync(self, timeout: float | None = None) -> bool:
        return self.__event.wait(timeout=timeout)

    # Polling from the main thread looks good from a code-complexity and system-resources
    # perspective, but incurs pointless delays of up to `interval` seconds.
    async def wait_async_polling(self, *, interval=0.1) -> bool:
        while not self.__event.is_set():
            await anyio.sleep(interval)
        return True

    # We can avoid that delay by using one 'waiter' thread per `await`ed threading.Event:
    _already_waiting_events: weakref.WeakKeyDictionary[threading.Event, anyio.Event] = (
        weakref.WeakKeyDictionary()
    )

    async def wait_async_worker_thread(self) -> bool:
        if self.__event in self._already_waiting_events:
            await self._already_waiting_events[self.__event].wait()
            return True
        
        def wait_for_event(threading_event, set_anyio_event):
            while not threading_event.wait(timeout=1.0):
                # If the `await` is cancelled, stop quickly even if the underlying event hasn't fired yet.
                anyio.from_thread.check_cancelled()
            anyio.from_thread.run_sync(set_anyio_event)
            return True

        self._already_waiting_events[self.__event] = anyio.Event()
        return await anyio.to_thread.run_sync(
            wait_for_event,
            self.__event,
            self._already_waiting_events[self.__event].set
            cancellable=True,
            # We want to avoid a global threadpool limit and corresponding possibility
            # of deadlocks.  Since we're confident that there's at least one thread
            # doing useful work for each outstanding event and we ensure only one
            # waiter-thread for each threading.Event, we don't need a limit here.
            limiter=anyio.CapacityLimiter(1),
        )

If we could arrange for a callback when the event is set, that would allow for a more elegant implementation than simply wrapping the threading.Event, but I prefer not to require supporting changes for an initial version.

More complicated cases

Higher-level wrappers for HTTP streams and websockets would also be nice - but even if this is desired I'd suggest starting with the low-level async/await shims and encouraging some downstream experimentation before committing e.g. exposing an e.g. wsproto interface.

Acknowledgements

  • I may be able to implement this feature request
@Zac-HD Zac-HD added feature-request A feature should be added or improved. needs-triage This issue or PR still needs to be triaged. labels Apr 18, 2024
@jmklix jmklix added p3 This is a minor priority issue and removed needs-triage This issue or PR still needs to be triaged. labels Apr 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature-request A feature should be added or improved. p3 This is a minor priority issue
Projects
None yet
Development

No branches or pull requests

2 participants