Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

trio.Queue methods do not play with inspect.iscoroutinefunction #635

Closed
achennu opened this issue Aug 28, 2018 · 10 comments
Closed

trio.Queue methods do not play with inspect.iscoroutinefunction #635

achennu opened this issue Aug 28, 2018 · 10 comments

Comments

@achennu
Copy link

achennu commented Aug 28, 2018

I've been studying trio for a while now and boy, it is a breath of fresh air -- in terms of API design, documentation and even in the clarity of design (and code) it brings to me. Thank you @njsmith for your fantastic explanations in your blogs and in the docs.

Now, I've run into a situation where trio seemingly does not reflect the nature of the function correctly.

>>> import trio, inspect
>>> q = trio.Queue(2)
>>> inspect.iscoroutinefunction(q.put)
False
>>> inspect.iscoroutinefunction(q.get)
False

These are async def functions, but it seems that the @_core.enable_ki_protection decorator messes with the introspectability of the code.

A couple of searches on this repo and in the docs did not throw up any information. Is this a bug?

I'm using trio version 0.6.0 and python 3.7.

@smurfix
Copy link
Contributor

smurfix commented Aug 28, 2018

This is not a bug. You cannot depend on inspect.iscoroutinefunction not reporting false negatives; that's an integral part of how Python works and one of the reasons why trio calls indirect functions like this::

async def some_code():
    pass
async def run(proc):
    await proc()
await run(some_code)

while asyncio's idiom is ::

async def some_code():
    pass
async def run(proc):
    await proc
await run(some_code())

@smurfix
Copy link
Contributor

smurfix commented Aug 28, 2018

Or, put another way: you really don't want code that depends on introspection when you use it. If you really need to do it, use

async def run(maybe_async_proc):
    result = maybe_async_proc()
    if iscoroutine(result):
        result = await result

but, again, you really shouldn't do that.

@achennu
Copy link
Author

achennu commented Aug 28, 2018

Hmm, I see. Thank you @smurfix

So would this be a reasonable way to handle it:

result = func(*args, **kwargs)
if inspect.isawaitable(result):
    result = await result
return result

@smurfix
Copy link
Contributor

smurfix commented Aug 28, 2018

Well … the question is, why do you need an interface that accepts both sync and async functions in the first place?
The main use case for this is pattern is "callback functions", but in Trio we don't use callbacks. The idiomatic replacement for

[async] def maybe_async_callback(data):
    [await] process(data)
# run_server() doesn't return, unless cancelled
await run_server(…, maybe_async_callback)

is

async with run_server(…) as service:
    async for data in service:
        [await] process(data)

@achennu
Copy link
Author

achennu commented Aug 29, 2018

Related to the topic of this issue:

  • could you point me to where it is said that the results of inspect.iscoroutinefunction cannot be relied on?
  • If trio strives for correctness, then I would imagine that the public API of a basic primitive (trio.Queue) would be correctly inspectable. Just my opinion, as I'm not aware of the intricacies of why this may not be possible.
  • Yes, it may not be a 'bug' in terms of incorrect operation, but certainly leads to that in calling code when introspected with inspect.

Thank you for that response though. I'm actually trying to figure how to write trio-esque code that can translate an existing code based on tornado loops. Although not directly related to the topic of this issue, I hope I can ask your advice on how to write things with trio. I'm not used to writing networking code, so I'm not sure what async with run_server(...) as service is setting up.

Consider the class:

class timed_window(Stream):
    """ Emit a tuple of collected results every interval

    Every ``interval`` seconds this emits a tuple of all of the results
    seen so far.  This can help to batch data coming off of a high-volume
    stream.
    """
    def __init__(self, upstream, interval, **kwargs):
        self.interval = convert_interval(interval)
        self.buffer = []
        self.last = gen.moment

        Stream.__init__(self, upstream, ensure_io_loop=True, **kwargs)

        self.loop.add_callback(self.cb)

    def update(self, x, who=None):
        self.buffer.append(x)
        return self.last

    @gen.coroutine
    def cb(self):
        while True:
            L, self.buffer = self.buffer, []
            self.last = self._emit(L)
            yield self.last
            yield gen.sleep(self.interval)

When this class is initialized, an existing or created IOLoop object is used to start the long-running self.cb() task. My attempt to write the same thing with trio is

class timed_window(Stream):
    """ Emit a tuple of collected results every interval

    Every ``interval`` seconds this emits a tuple of all of the results
    seen so far.  This can help to batch data coming off of a high-volume
    stream.
    """
    _graphviz_shape = 'octagon'

    def __init__(self, upstream, interval, **kwargs):
        self.interval = convert_interval(interval)
        self.buffer = []
        self.last = time.time()
        Stream.__init__(self, upstream, **kwargs)

    async def update(self, x, who = None):
        self.buffer.append(x)

    async def cb(self):
        while True:
            await trio.sleep(self.interval)
            L, self.buffer = self.buffer, []
            await self.emit(L)

Now, I don't see how to start the self.cb task here, as async functions cannot be called from __init__. So it implies that the code that is responsible for initializing this instance must somehow then also start a nursery and call the self.cb function? Or is there a way to write an def __aenter__ function that can have a nursery setup for handling all child tasks?

Apologies if this is too off-topic. I'd be glad to take pointers on another forum to ask this on.

@njsmith
Copy link
Member

njsmith commented Aug 29, 2018

@achennu Another example where iscoroutinefunction gets confused is functools.partial:

In [3]: inspect.iscoroutinefunction(my_async_fn)
Out[3]: True

In [4]: inspect.iscoroutinefunction(partial(my_async_fn, 1))
Out[4]: False

So yeah, calling it and then checking isawaitable or isinstance(..., collections.abc.Coroutine) or similar is generally going to be more reliable. But like @smurfix said, really it's best to avoid getting putting yourself in a position where you have to guess whether a function is async or sync, if you can possibly avoid it...

@njsmith
Copy link
Member

njsmith commented Aug 29, 2018

Oh sorry, I didn't see your latest reply when writing that.

If trio strives for correctness, then I would imagine that the public API of a basic primitive (trio.Queue) would be correctly inspectable. Just my opinion, as I'm not aware of the intricacies of why this may not be possible.

Well.... the problem is inspect.iscoroutinefunction does tell you, reliably, whether the function you are looking at is the exact Python object returned by some async def statement. But, that's almost never what you actually want to know! What you really want to know is whether the object you are looking at quacks like an async function. And unfortunately Python simply doesn't have a inspect.quackslikecoroutinefunction routine, so there's no way to ask that :-(.

So it implies that the code that is responsible for initializing this instance must somehow then also start a nursery and call the self.cb function? Or is there a way to write an def __aenter__ function that can have a nursery setup for handling all child tasks?

If you have an object that requires a background task, then one nice idiom is to make your public "constructor" be an async with, kind of like how open_nursery works. So like, your users do:

async with open_batched_stream(...) as batched_stream:
    async for batch in batched_stream:
        ...

and that async with takes care of the background task, and also cleaning up any other associated resources (like TCP connections or whatever). You can implement this like:

@asynccontextmanager
async def open_batched_stream(...):
    async with trio.open_nursery() as nursery:
        batched_stream = BatchedStream(...)
        nursery.start_soon(batched_stream._background_task)
        yield batched_stream
        nursery.cancel_scope.cancel()  # cancel the background task

(asynccontextmanager is available in contextlib on python 3.7, or if you need to support older pythons then the async_generator library has a backport.)

@achennu
Copy link
Author

achennu commented Aug 29, 2018

Thanks @njsmith for the clarification.

I consider this issue now closable, regarding the inspected nature of trio.Queue methods.

I am trying to internalize the API design patterns that trio brings, and your explanation helps. (Your reply came in as I was on the doc page of @asynccontextmanager!). I have written a trio port of the streamz.core, with all tests passing. The only missing parts are the classes that invoke such background tasks in __init__, as that's not possible with async def afunc().

In taking on this type of API pattern, I'm wondering how 'expensive' it is to create nurseries? In my Stream class, I have:

class Stream:
    def __init__(self, ...):
        ...
        self.downstreams = downstreams

    async def update(self, x, who=None):
        await self.emit(x)

    async def emit(self, x):
        async with trio.open_nursery() as nursery:
            for downstream in self.downstreams:
                nursery.start_soon(downstream.update, x, self)

In such an implementation, each call to emit of each node in the flow path creates a nursery to update its downstreams. The advantage is that each emit() is awaitable (with automatic backpressure handling for downstream elements that use trio.Queue). However, it is profuse in terms of number of nurseries. I'm not sure how to measure if this scales poorly.

Considering your comments on using different constructors, I suppose it would have to look something like:

from trio_streamz import run_dataflow, core

stream = core.Stream()
f1 = stream.map(lambda x: x+1).timed_window(2)
f2 = stream.map(lambda x: x*2).partition(3)
z = core.zip(f1, f2)

async with run_dataflow() as runner:
    for i in range(10):
        runner.emit(stream, i)

This gives the chance that one runner has a nursery, and that's the one used for all emit calls. Is this for some reason preferable?

@njsmith
Copy link
Member

njsmith commented Aug 29, 2018

I haven't measured the cost of setting up and tearing down a nursery, but it's just creating and manipulating a few python data structures, not doing any heavy computation or operating system calls or anything. I'd say, do whatever makes the code most readable, and if later on you discover it's causing speed problems, let us know and we'll try to make the readable code fast :-)

It is true that if you want nodes in your flow graph to be "live" outside of calls to emit, then you'll need some kind of nursery that denotes the boundary of their life and some way to pass the nursery through where it's needed. I'm not familiar enough with your API to guess what would be the most idiomatic way to do that, but some options I can imagine would be:

  • Passing it into just the methods that need background tasks (delay etc.)
  • Binding a nursery to the Stream object, and making it available to downstreams.
  • Something like your runner idea

By the way, just something to be aware of: trio has its own very different interface called Stream, which is used a lot (e.g. trio.open_tcp_stream). Nothing to be done really, but maybe something to point out in your docs eventually so people don't get confused :-)

@njsmith
Copy link
Member

njsmith commented Aug 30, 2018

For the autodetection thing, I'd suggest picking of these conventions:

  • map_sync takes a sync fn, map_async takes an async fn

  • map takes a sync fn, map_async takes an async fn

  • map_sync takes a sync fn, map takes an async fn

(The other option is amap, but that convention mostly gets used when the function being named is itself async, so I wouldn't use it here – you don't want people to think they should be writing await stream.map(...).)

Trio itself uses the last convention; for example, BlockingTrioPortal has run_sync and run methods. But that's for trio, where it's fair to assume that people are usually interested in doing async things :-). For your trio-ized version of streamz, you might want to do the same, or keep the sync versions undecorated for consistency with the regular version of streamz, or make everything explicit ... your call really.

Anyway, sounds like the issue in the title is resolved, so I'm going to close this. But I'm happy to keep chatting here or in the chat!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants