Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Discussion: what's the best strategy for "nowait" and "select" operations? #242

Open
njsmith opened this issue Jul 10, 2017 · 22 comments
Open

Comments

@njsmith
Copy link
Member

njsmith commented Jul 10, 2017

So from @wingo's blog I have learned about Concurrent ML. (His concurrency tag is also worth perusing generally.)

The core insight, IIUC, is to take the atomic blocking operations – roughly, the same ones our current cancellation guarantee applies to – and systematically split them into a few consistent sub-operations:

  • attempt to perform the operation synchronously, possibly wrapping the result in some thunk
  • publish that we're waiting for the operation to be doable and should be woken up if it might succeed
  • give up on waiting ("unpublish")

We already implicitly have this basic structure open-coded in a bunch of places, e.g. the _try_sync helper in trio/socket.py, the classes in trio/_sync.py, etc. Pretty much anywhere you see the yield_if_cancelled/yield_briefly_no_cancel pair in trio fits this general pattern, and "unpublish" is basically just our abort callback. So the advantages of reifying this would partly be just to simplify the code by having a single implementation of the overall pattern that we could slot things into – but even more, so because given the above pieces, you can create generic implementations of three variants:

  • Try to perform the operation, but fail if it would block (like the *_nowait variants that we currently implement in an ad hoc way)
  • Perform the operation, blocking as necessary (what we do now)
  • A thing like golang's select, where you can say "perform exactly one of the following operations: ..."

(The first is done by just calling the "try this" sub-operation; the second you do by trying and then blocking if you fail, in a loop, with the unpublish operation as the abort callback; the third is done by calling a bunch of "try this" sub-operations and if one succeeds you're done, otherwise publish all the operations and go to sleep. There's some subtleties around knowing which operation woke you up, and when unpublish happens, etc., but that's the basic idea.)

Right now we have a bunch of manual implementations of await x() / x_nowait() primitives. It's not clear that we have enough, either; #14 has an example of a case where you semantically need accept_nowait, and for HTTP client connection pooling when you pull a connection out of the pool you need something like receive_some_nowait to check whether the server has closed it before you try to use it.

Also, a golang-style select is potentially quite useful but isn't possible right now (at least for trio's built-in operations, of course you certainly could build a golang-style channels library on top, but then select would only work on that library's operations). You can spawn some child tasks to try doing all the things concurrently, but there's no way to make sure that no more than one complete – for that you would need to be able to (1) guarantee that all of the operations really are cleanly cancellable, and (2) perform the cancellation synchronously with the first operation completing, which isn't possible if the last thing it does after committing its work is to call yield_briefly_no_cancel.

An ancillary benefit is that if we expose these things as a standard interface to users, then this would also serve as very clear documentation of which the actual atomic cancellable operations are.

But, there are also some issues, I think:

IOCP: the above pattern works for BSD-style non-blocking socket operations, but not for Windows IOCP operations (#52). You can implement cancellable blocking operations as IOCP calls (that's basically what they are), and nowait operations using Windows' non-blocking calls, but IOCP has no generic equivalent to epoll to implement wake-me-when-this-non-blocking-call-might-succed, which means that golang-select is not possible. All you can do is ask the kernel to start initiating all the operations, and then by the time you find out that, say, your recv has finished, then your send might also have finished. I guess this might be possible to work around: for send I'm pretty sure we can treat IOCP as a kind of epoll, and then use non-blocking send. For recv I'm not sure if this works and for accept I'm pretty sure it doesn't, but for these operations you can more-or-less fake them as being always cancellable by using a little user-space buffer to push back any result that you want to pretend didn't happen. Are there any other cases? Do we need to change the pattern to accomodate this? The pushback trick doesn't seem compatible with a strict separation between "try it" and "wake me when you want to try again" primitives.

The main problem with HTTP connection pooling isn't checking if the socket is readable – we can already do that in the specific case of a raw socket. It's that it isn't something you can reasonably abstract in terms of generic "streams". In particular, if you have an TLS-wrapped socket, then you actually don't want to check if the TLS layer has data available, you really do want to check the raw socket underneath. And in any case I'm not sure that this would help make it easier to implement receive_some_nowait as a generic feature on streams, because receiving on a TLS stream is a very complex operation that may require things like lock acquisition, and all of the operations above have to be synchronous. So maybe the HTTP connection pool case isn't really a good motivator anyway.

Lock.acquire and Lock.acquire_nowait are tricky because of fairness (#54); it's not the case that acquire is just a loop like while acquire_nowait fails: sleep until it might succeed, because that creates a race on wakeup. I don't think it's possible to implement a fair mutex using just the framework described above. The problem is that we really need the two operations to be "try it" and "block until the operation is done"; a retry loop just doesn't work. So maybe this is actually equivalent to the IOCP case? Maybe we need primitives:

  • try it without executing any cancel/schedule point
  • publish whatever we need to publish to be woken up (but don't actually sleep), with some callbacks for abort, reschedule, and we-got-successfully-rescheduled

I think this is flexible enough to implement fair synchronization primitives and handle all the public operations. E.g. for golang-select we would want to arrange so that when one operation gets rescheduled, then we immediately abort all the other operations, before waiting to actually be woken up – this would need to happen from the context of the task that's handing off the mutex (for example).

But... this isn't quite right for stuff like non-blocking socket operations, where you actually need a try-sleep-retry-sleep-retry-... loop. Need to think some more about this.

@njsmith
Copy link
Member Author

njsmith commented Sep 5, 2017

For reference, here's a minimal Queue that lets you select on get. The design would also support selecting on put with some small tweaks. Possibly useful for extracting more general patterns out of? I believe that the design there is actually fair and could be extended to implement the full Queue API, and would even be faster than what we currently have. (It's based on ideas I've been playing with in the context of #272.)

@njsmith
Copy link
Member Author

njsmith commented Sep 5, 2017

Maybe the key idea I needed to make the code in the previous comment work is: you need to make the actual operation happen synchronously with the reschedule, because:

  • you have to unregister the task everywhere synchronously with the call to reschedule (not with when the thing actually wakes up), otherwise you might get rescheduled twice. (Cf Should we call the abort_fn when rescheduling in general? #315)

  • if you want to support fairness, then you can't re-register after unregistering

  • which means that you can't unregister unless the operation is completed

  • therefore, the operation completion has to happen synchronously with the call to reschedule.

We've already moved in this direction for Lock because it makes condition variables simpler, and we'll probably go this way for Queue too regardless. So maybe it's a good general way to do things? And then there's primitives like Event and CapacityLimiter and Semaphore that basically work this way already because there isn't really anything to do on the wakeup path anyway.

OTOH, there are also operations where you simply can't perform the operation synchronously at reschedule time: Unix socket operations are like this, because it's possible for the select-equivalent to report a socket ready, but then get EWOULDBLOCK anyway and you just have to wait again. I guess we could support this by changing the abstraction boundary between trio.socket and trio.hazmat so that instead of just waiting for a socket to be readable/writable, you could actually pass through an operation for the IOManager to attempt to perform before waking up the task?

Are there other operations like this? IOCP is a whole other issue, but with IOCP it's literally impossible to get exactly-one-of-these semantics no matter what you do, so I guess we can write that off. I don't know that anyone actually cares about the ability to do exactly-one-of-these on sockets anyway. It's stuff like Queues where this generally comes up. (Though we probably want accept_nowait, so at least that might be good to fit into this pattern if this becomes The Way To Do nowait Functions. On the other other hand, forcing every Listener class to do this may be too far to go in search of a pointless consistency.)

@njsmith
Copy link
Member Author

njsmith commented Sep 6, 2017

@jchayat in chat described a use case they have for select-style semantics: https://gitter.im/python-trio/general?at=59b03698c101bc4e3a987289

More discussion: https://gitter.im/python-trio/general?at=59b07c66b16f26464212fe0c

It sounds like at least in this case, a multi-task strategy is a viable alternative, though @jchayat still feels:

it's much more natural to think about it as 1 sequential task with "select"

@njsmith
Copy link
Member Author

njsmith commented Sep 9, 2017

There are some fun error cases to handle.

If some wait setup call errors, then you have to go and abort all the previous wait setup calls to unwind

One way a wait setup call can error is if the same task is trying to wait twice on the same event, e.g. trying to simultaneously put two items into the same queue. Or how should this be handled? The annoying thing is that if we just iterate through the nowait versions first, and one of them succeeds, then we'll never notice that the two actions conflict. OTOH it seems difficult to make this succeed for the wait case, because who wants to deal with the complications and overhead of using a MultiDict to keep track of waiting tasks e.g. I guess the intermediate option would be to silently throw one of them away, but making this work with abort logic might be tricky.

@njsmith
Copy link
Member Author

njsmith commented Sep 9, 2017

Oh, another fun case: if we want to support this for ParkingLot, and in particular Conditions, then we need some replacement for the abort_func reassignment hack that ParkingLot.requeue currently uses.

@njsmith
Copy link
Member Author

njsmith commented Dec 24, 2017

@xgid
Copy link

xgid commented Dec 28, 2017

Thanks for sharing those links, @njsmith. They have been quite revealing to me. The most insightful was indeed the Select statement considered harmful and the "Two Big Use Cases" idea. So simple, but so "demolishing" at the same time.

Of course, they are an attempt to over-simplify all concurrency problems under some general patterns... but that's exactly where their power resides!

@njsmith
Copy link
Member Author

njsmith commented May 19, 2018

Here's another place where a nowait might be useful. This time it's on Stream.receive_some: #536

I guess it'd actually be possible to have a "no wait" mode, that could handle this in a generic way without going full concurrent-ML: basically it would be like cancellation, except that calls to trio.hazmat.checkpoint() would be allowed to continue. (Well, maybe we'd need something special for wait_*_{readable,writable}, where you check for readability/writability first before blocking? And for IOCP, we'd want to switch to non-blocking calls instead of regular IOCP calls...) That still seems less scary than trying to add full concurrent-ML select support to abstract interfaces like Stream and Listener.

@njsmith njsmith changed the title Discussion: can we steal stuff from Concurrent ML? Discussion: what's the best strategy for "nowait" and "select" operations? May 19, 2018
@njsmith
Copy link
Member Author

njsmith commented Aug 18, 2018

In #586 (the PR for adding channels), we've run into an interesting problem that's actually closely related to this: put_handle.put blocks waiting for either someone to call get_handle.get or put_handle.close, and these need to be resolved atomically.

See: #586 (comment)

@njsmith
Copy link
Member Author

njsmith commented Oct 8, 2018

It looks like the accept_nowait issue may be resolved by #636, so that may disappear as a motivating example here.

@oremanj
Copy link
Member

oremanj commented Feb 16, 2019

I've been working on incorporating some of the CML ideas into Trio, representing operations as (synchronous) generator functions that yield twice. (Ten-second overview: the part before the first yield is "attempt", between the first and second yields is "publish", and after the second yield is "unpublish". At the first yield the operation is sent a handle, which it arranges for another task to call .complete() or .fail() on while it's suspended at the second yield. It gets resumed with that value or error, or gets closed if the operation is cancelled/etc.) Any operation implemented in this way automatically supports blocking await foo(...), nonblocking foo.nowait(...), and foo.operation(...) (which returns a thing that can be wrapped, selected, etc).

So far I've reimplemented ParkingLot.park, Event.wait, {Lock,Semaphore,CapacityLimiter}.acquire, and Condition.wait on top of this and all their existing tests pass. I haven't yet done memory channel send/receive or socket send/receive/accept but I don't anticipate any problems supporting them. I don't plan to even try to support IOCP or generalized streams.

I'm pretty pleased with the ergonomics of this approach so far in terms of making it easy to write low-level operations that are robust:

  • the common pattern of "wrap the nonblocking attempt with half-checkpoints, then block if you couldn't complete immediately" is in the core logic rather than in each operation
  • you can do simple operation composition with yield from, which nicely answers Can we make abort functions more composable? #896
  • the operation function can throw an exception anywhere and your program won't crash
  • the operation is identified by a completion handle which is allocated just for that operation, rather than by the Task, so "tried to wake up before I went to sleep" type errors give a useful exception rather than havoc

It's like a friendlier, more composable, less flexible (because you can't refuse a cancellation) wait_task_rescheduled.

I'm still polishing it, but I'm wondering if this is something that Trio might be interested in having in the core, or whether I should target an external library instead? Having it in the core seems like a substantial force multiplier -- if it's in a library, you can only compose operations in that library or its dependencies, which probably winds up resulting in that library providing its own synchronization primitives that mostly reimplement Trio's core ones. On the other hand, it's a nontrivial chunk of functionality (_core/_operation.py is 418 lines right now) and demand for select() hasn't been that strong. Thoughts?

@oremanj
Copy link
Member

oremanj commented Feb 16, 2019

Some examples:

ParkingLot.park():

@_core.atomic_operation
def park(self):
    handle = yield
    # keep the current task so we can return it from unpark() --
    # but none of the _sync ops require it anymore
    self._parked[handle] = _core.current_task()
    handle.custom_sleep_data = self
    try:
        yield
    finally:
        del handle.custom_sleep_data._parked[handle]

Event.wait(): simple delegation

@_core.atomic_operation
def wait(self):
    if self._flag:
        return
    else:
        yield from self._lot.park.operation()

Condition.wait(): delegation with extra publish step and async cleanup

@_core.atomic_operation
def wait(self):
    task = _core.current_task()
    if task is not self._lock._owner:
        raise RuntimeError("must hold the lock to wait")

    # NOTE: we go to sleep on self._lot, but we'll wake up on
    # self._lock._lot. That's all that's required to acquire a Lock.
    handle = yield self._lot.park.operation()
    self.release()
    try:
        yield
        self._lock._owner = task
    except:
        handle.add_async_cleanup(self.acquire)
        raise

Memory channel send/receive:

class MemoryChannelState:
    ...
    @_core.atomic_operation
    def send(self, value):
        if not self.open_receive_channels:
            raise _core.BrokenResourceError
        if self.receive_ops:
            assert not self.data
            receive_handle = next(iter(self.receive_ops.keys()))
            receive_handle.complete(value)
        elif len(self.data) < self.max_buffer_size:
            self.data.append(value)
        else:
            send_handle = yield
            self.send_ops[send_handle] = value
            try:
                yield
            finally:
                del self.send_ops[send_handle]

    @_core.atomic_operation
    def receive(self):
        if self.send_ops:
            send_handle, value = next(iter(self.send_ops.items()))
            send_handle.complete()
            self.data.append(value)
            # Fall through
        if self.data:
            return self.data.popleft()
        if not self.open_send_channels:
            raise _core.EndOfChannel
        receive_handle = yield
        self.receive_ops[receive_handle] = None
        try:
            return (yield)
        finally:
            del self.receive_ops[receive_handle]
...
# _closed changed from a bool to an Event
class MemorySendChannel:
    @_core.atomic_operation
    def _fail_when_closed(self):
        yield from self._closed.wait.operation()
        raise _core.ClosedResourceError

    @_core.atomic_operation
    def send(self, value):
        yield from _core.select.operation(
            self._fail_when_closed.operation(),
            self._state.send.operation(value),
        )
...
class MemoryReceiveChannel:
    @_core.atomic_operation
    def _fail_when_closed(self):
        yield from self._closed.wait.operation()
        raise _core.ClosedResourceError

    @_core.atomic_operation
    def receive(self):
        return (
            yield from _core.select.operation(
                self._fail_when_closed.operation(),
                self._state.receive.operation(),
            )
        )
    ...

@njsmith
Copy link
Member Author

njsmith commented Feb 16, 2019

Oh cool!

I'm still polishing it, but I'm wondering if this is something that Trio might be interested in having in the core, or whether I should target an external library instead? Having it in the core seems like a substantial force multiplier -- if it's in a library, you can only compose operations in that library or its dependencies, which probably winds up resulting in that library providing its own synchronization primitives that mostly reimplement Trio's core ones. On the other hand, it's a nontrivial chunk of functionality (_core/_operation.py is 418 lines right now) and demand for select() hasn't been that strong. Thoughts?

I think this is too complicated a decision to make quickly :-). But having actual code will tell us a lot!

Some things I want to understand better before forming any conclusions:

  • What does the select syntax look like? How do you tell which operation completed? What are the semantics for picking which operation if multiple are complete-able?
  • Is select really useful? (1) If it were available, would people use it? (2) If they do, then does it make their code better or worse?
  • What do the docs for writing these operations like this look like? How understandable are they? Do they give better ergonomics to implementors than what we have now?
  • How does this affect fairness?
  • What happens if someone passes the same operation twice to a single call to select?

...Looking at that list, some of those feel like they're going to need to cook for a while before we can draw conclusions. So maybe it makes most sense to put it in a library for now? It's true that if it does turn out to be a big win then having it in the core will act as a force multiplier, but we can probably learn things without that?

@oremanj
Copy link
Member

oremanj commented Feb 16, 2019

Cool, glad this seems interesting! I'm going to keep prototyping it as a PR to Trio because that gives a nice collection of ready-made examples (and tests!) of how it might simplify low-level operations, but I won't be sad if you want to hold off on reviewing/merging for a while/ever. :-)

What does the select syntax look like? How do you tell which operation completed? What are the semantics for picking which operation if multiple are complete-able?

If you want to select between await send_channel.send("foo"), await receive_channel_1.receive(), and await receive_channel_2.receive(), you would write:

result = await trio.hazmat.select(
    send_channel.send.operation("foo"),
    receive_channel_1.receive.operation(),
    receive_channel_2.receive.operation(),
)

result will be None (if the send won) or whichever value was received. For "which one was it?":

@trio.hazmat.atomic_operation
def tag_with_self(operation):
    return (operation.args[0], (yield from operation))

@trio.hazmat.atomic_operation
def select_with_self_tag(*operations):
    tagged = [tag_with_self.operation(operation) for operation in operations]
    return (yield from trio.hazmat.select.operation(*tagged))

which_channel, result = await select_with_self_tag(
    # ... as above ...
)

Of course select_with_self_tag could be packaged for general use, or could be select(..., tag="self"), or whatever. There are a bunch of these combinators one could imagine; I've been focusing on the core functionality for now.

(Should these be in hazmat? Right now I have them in hazmat, because writing combinations of unevaluated operations is a little bit of a departure from trio's traditional worldview. So far select and the @atomic_operation decorator are the only two exported names.)

Is select really useful? (1) If it were available, would people use it? (2) If they do, then does it make their code better or worse?

This is probably one of those questions that requires some time to marinate :-) but my intuition here is that select() will be a useful substitute for some reasonable set of things that currently get implemented as "start some tasks in parallel, cancel the others when the first one finishes; if another one finished too before it gets your cancellation, awkwardly try to stuff that cat back into the bag".

Operations are also useful even without select, because of how the composition mechanisms let you run additional "unpublish" code synchronously with the reschedule/abort.

What do the docs for writing these operations like this look like? How understandable are they? Do they give better ergonomics to implementors than what we have now?

Definitely a major open question! I'll see what I can come up with :-)

How does this affect fairness?

Each operation (including each individual branch of a select()) gets its own completion handle, which can go in the wait queue of your choice. When one of them gets called, the others synchronously remove themselves. I don't understand some of the WFQ discussions well enough to know whether this system would compromise our ability to move in that direction, but it plays totally fine with the current strict-FIFO fairness.

What happens if someone passes the same operation twice to a single call to select?

They both get published (in left-to-right order). Everything is indexed by the completion handle, not the task, and each branch of the select() gets a different completion handle. Whichever one gets completed first will synchronously unpublish the other one. So it's kind of a silly thing to do, the second copy of the operation doesn't add anything, but it's not going to totally blow up.

(All the things I'm calling "operation" are wrappers around the generator function plus its arguments, so there's no issue with calling the same one multiple times. This is needed to support retrying, to preserve BSD socket semantics. Internally there are also "opiters" i.e. generator iterators of operation functions, but those aren't exposed publicly.)

@oremanj
Copy link
Member

oremanj commented Feb 16, 2019

Code is on the operation branch in my fork (master...oremanj:operation). I'd encourage you not to spend too much time staring at trio/_core/_operation.py there, since it has almost no comments yet, but the examples of how it's used are probably interesting.

@njsmith
Copy link
Member Author

njsmith commented Feb 16, 2019

The add_async_cleanup thing is interesting. Does it run in all cases, or just if the operation is cancelled?

I was looking at this article again, and its running example for a novel selectable operation is swap, which in their implementation involves doing a blocking operation after committing to the operation. Their implementation is safe in practice because they have some abstraction boundaries that mean the other side is definitely also using swap, so as soon as we send our value we know they'll immediately send back their value, and it doesn't block in practice. But the type system doesn't guarantee it. So there's some question whether it's better to allow async operations here and just tell people to be really careful which ones they use, or to require the code be synchronous so it can't block. https://github.com/agronholm/typeguard`Condition.wait` is a total oddball in general, so I'm not sure how much it should affect our judgement. (Making it non-selectable also seems like an OK outcome.)

I think a closely related issue is how in classic concurrent ML, when you write a select you also specify some code to run on each possible branch. In simple cases, these are equivalent:

# concurrent ML style
await select({
    send_channel.send.operation("foo"): lambda _: print("sent"),
    receive_channel.receive.operation(): lambda value: print(f"got: {value}"),
})

# prototype style
branch, value = await select_with_tag({
    send_channel.send.operation("foo"): "sent",
    receive_channel.receive.operation(): "received",
})
if branch == "sent":
    print("sent")
elif branch == "received":
    print(f"got: {value}")

But, they're different in a crucial case: in the first one, if you make it select.operation(...), then you've created a new atomic operation, that incorporates the print calls and everything. I think in concurrent ML this is the primary way that you compose new atomic operations?

The yield from trick is clever, but I wonder if it would be better to pay the syntactic tax to let select incorporate handlers for each branch, and use that instead. Another possible approach:

select_builder = SelectBuilder()
@select_builder(send_channel.send.operation("foo"))
def handler(_):
    print("sent")
@select_builder(receive_channel.receive)
def handler(value):
    print(f"got {value}")
new_op = select_builder.finish()

...oh, I see, but then this is pretty awkward when you want to actually wrap this up into a new public API like your MemoryReceiveChannel.receive. I guess we'd need @wrapped_atomic_operation or something? Bleh.


another random thought:

await select(
    partial(send_channel.send, "foo"),   # or trio.withargs or something, maybe
    receive_channel.receive,
)

@oremanj
Copy link
Member

oremanj commented Feb 16, 2019

The add_async_cleanup thing is interesting. Does it run in all cases, or just if the operation is cancelled?

It always runs if control flow reaches the add_async_cleanup call. You make the add_async_cleanup call in the "unpublish" phase of an operation function, though, so you can do it only on success or only on failure if you like. (If it wasn't clear from the examples, the operation result value/error is sent/thrown at the second yield.) Currently it's not possible to distinguish "the entire select() was cancelled" from "a different branch won" -- they both throw in GeneratorExit. I think that's probably good for composability.

The actual async cleanup functions run, shielded, in the woken task after it gets rescheduled. If there are multiple, the current implementation runs them in parallel in a nursery; maybe this should be one-at-a-time in the same order as the sync cleanups, but then it gets confusing that we run all the sync cleanups before any of the async ones. I added async cleanup mostly to support wait(). It could also support the swap() operation you linked, and could probably be shoehorned into supporting things that are "undoable but not immediately cancellable", like receiving a fixed-size message on a buffered stream (you push any excess part back into the buffer). I say "shoehorned" because the implementation would involve a system task and I don't know if there's a use case that's worth it. (I haven't implemented that one and don't have any near-term plans to.)

I don't think async cleanup is too much of an attractive nuisance given that it's kind of awkward to use it (you have to define a separate async function, you can't just write await in your operation function). We definitely do need to be able to collect and run all the synchronous unpublish parts without checkpointing, so I don't think there's any way to let people write await in the operation function without an excess of foot-shooting.

I think a closely related issue is how in classic concurrent ML, when you write a select you also specify some code to run on each possible branch.

Yep, the two cases (running as part of the operation vs running afterward) definitely carry different semantics and supporting both of them makes sense. One especially notable difference in my implementation is:

  • any wrappers (like your print lambdas) that run as part of the operation effectively are part of the "unpublish" phase
  • ... which runs in the waking task, so it can be synchronous with the reschedule, for all the reasons we've discussed that being desirable
  • ... so they can't use contextvars, current_task(), etc, which is a bit of a footgun
  • ... so maybe it's good if they're a little more annoying to use than the run-once-operation-completes version? There are definitely some that shouldn't be annoying, but we can write non-annoying combinators for them. And "wrap the result in this lambda" is an easy one, although given the state of Python's lambdas it'll never be completely non-annoying. :-)

[using e.g. partial(send_channel.send, foo) as a selectable]

My first draft did some magic so all of

await send_channel.send(value)
send_channel.send(value).nowait()
await select(send_channel.send(value), receive_channel.receive())

would work. I deemed that too magical and switched to the current version where the way you're planning on using the function gets written before the parens. It could change back though. One wrinkle is: how do you turn partial(send_channel.send, value) into an operation? If you expect to get something useful from calling it, it's tricky to also maintain the "coroutine was never awaited" warning. I guess you can look inside the partial object without calling it, but that seems error-prone to me.

Currently I have the post-decorator send_channel.send(value) directly return a coroutine object for the internal perform_operation(send_channel.send.operation(value)) (it uses def ...: return ... rather than async def ...: return await ...). The places that expect an operation know how to detect that they were passed a perform_operation coroutine object, and if they get one they pull out the operation object from f_locals and throw an exception "use MemorySendChannel.send.operation(...), not MemorySendChannel.send(...)". Changing that to just use the operation object instead of complaining would be easy, but maybe we don't want to train our users to write some_async_operation(args) without an await in front of it ;-)

I guess we could combine these tricks, actually: if you're expecting an operation, and you get a callable instead, try calling it with no arguments; if you get a coroutine object for perform_operation back, use coro.cr_frame.f_locals["operation"] as your operation and call coro.close() to suppress the unawaited warning. Too magic, or just the right amount? There's something that does feel very "trionic" about writing await select(<list of async thunks>). On the gripping hand, it could get quite confusing that this doesn't work for any async thunk... more thought needed.

Side note: I discovered that it is much harder than I thought to write an awaitable object using an async function. Edit: nope, I just didn't realize you can call __await__() directly on a coroutine object. Order has been restored to the universe.

@smurfix
Copy link
Contributor

smurfix commented Feb 16, 2019

Demand for select hasn't been that strong because, well, there's no such thing yet, thus we restructure stuff so that we can compose things with tasks instead, thenceforth we don't need select any more.

However, there are a couple of pieces of code I'd like to translate from Go to Python (because, you know, Python ;-) ) and having select would make that kind of job a whole lot more straightforward.

@glaebhoerl
Copy link

The blog post in the OP also mentions Reagents as a generalization of CML. From the comments:

Reagents do generalise CML. The main difference is that CML only allows you to combine operations with select (op1 OR op2) while Reagents also allow you to combine operations into a transaction (op1 AND op2 for independent operations and op1 THEN op2 for dependent ones).

Basically, using this comment's notation, THEN performs op1 and op2 sequentially, passing the output of op1 as the input of op2, while AND performs them in parallel, returning the output of both op1 and op2 in a tuple (which basically corresponds to join patterns). The key is that all these forms of composition can be combined arbitrarily, so you can AND things together and then use the resulting operation as a single unit in a select, or whatever. (More links, in the context of OCaml this time.)

Reagents was originally conceived as a low-level library for lock-free programming:

Reagents are lightweight in that the library analyses the combined operation and then figures out an efficient CAS scheme to execute it. Reagents also include some more low level operations, such as CAS as a reagent.

But I don't know whether that is somehow intrinsic to the whole concept, or if the basic programming model is separable from that. (I'd guess that it would be, but it's just a guess.)

Anyway, I don't know how relevant any of this might be to Trio :), just figured it might be worth a mention. @oremanj mentioned that select can be emulated to some extent using structured concurrency and cancellation, so it makes me wonder if there are any other connections.

@njsmith
Copy link
Member Author

njsmith commented Feb 16, 2019

@oremanj

One wrinkle is: how do you turn partial(send_channel.send, value) into an operation? If you expect to get something useful from calling it, it's tricky to also maintain the "coroutine was never awaited" warning. I guess you can look inside the partial object without calling it, but that seems error-prone to me.

Yeah, partial objects are introspectable so I was imagining partial_obj.func.operation(*partial_obj.args, **partial_obj.kwargs). We do something similar when trying to sniff good task names. It would mean we would only support partial objects, not e.g. the equivalent lambda, but that doesn't seem worse than only supporting .operation objects? Partly this depends on how #470 resolves, but if we end up sticking with always using partial as our standard way to represent unevaluated thunks then using them here would be pretty consistent.

@smurfix

However, there are a couple of pieces of code I'd like to translate from Go to Python (because, you know, Python ;-) ) and having select would make that kind of job a whole lot more straightforward.

Well, select will at the least exist in a library you can use :-)

@glaebhoerl

The blog post in the OP also mentions Reagents as a generalization of CML

Heh, I just stumbled on those through some completely different path yesterday... I actually have Aaron Turon's thesis open, but I haven't wrapped my head around these yet :-).

However, based on first principles, I feel like there must be some pretty substantial limitations. In Trio, you can sorta fake select by opening a nursery and racing all the operations against each other, and as soon as one finishes cancel the rest. BUT, this misses out on a crucial part of CML's select semantics: select guarantees that exactly one of the operations completes, and the rest don't happen at all. With the fake version, there's no way to rule out having two operations complete at the same time. The atomicity guarantee is what makes select strictly more powerful than other operations we have.

But it also majorly limits what kinds of operations you can select on: you need a single instant-in-time where you realize that an operation is going to commit, and then at that instant, you need to be able to roll back all the other operations that aren't going to commit.

In general, OR preserves this kind of atomicity, but THEN and AND do not. There may be ways to save it for particular operations – e.g. with AND, if the left operation becomes eligible to commit first, maybe you can somehow pause it there while waiting to find out if the right operation commits? (But this is user-visible: if you have unbuffered channels and do chan1.receive AND chan2.receive, it means chan1.send will block until someone does chan2.send.) In the case of Reagents, I think the solution is that all of its operations have to be compiled down to a big kCAS? So e.g. I'm pretty sure it's possible with CML to make operations like sock.send(...) selectable, but I don't see how it could be possible to make them reagents operations.

OTOH, that might still be powerful enough to implement all the operations that we actually want to support select on...

@njsmith
Copy link
Member Author

njsmith commented Jun 2, 2019

This article seems relevant here: https://medium.com/@elizarov/deadlocks-in-non-hierarchical-csp-e5910d137cc

@bergus
Copy link

bergus commented Mar 3, 2020

In #1411 I was asked to present my uses cases for select() here. I had two actually:

  • The first is some kind of message queue/protocol endpoint. Similar to Add "one obvious way" for implementing the common multiplexed request/response pattern #467, I send requests to different objects and get back responses from them, however there is no unique matching of a response to a request by id. Instead, I send a request for x and get back one of multiple possible responses regarding x, say a confirmation or an error. The initial idea was to have a channel for confirmations and a channel for errors (per object ofc), which would have been a nice api surface also for requests that cause multiple responses.
    Now I would need to select() exactly one message from the channels from which I expect my single response. I must not receive from both channels at once, the second message actually is the response to the next request (and should be returned from the next .receive() call).
    I gather mailboxes like in Erlang would be the best solution for this. For now, I implemented a solution with callbacks, installing "matchers" in a queue which are evaluated in the receiver task until it finds one that can handle the message. Then it removes all belonging to the same "group" from the queue, so that none will be used again, before sending the single message to the respective "communication" task (who installed the matchers) with help of a dedicated event or channel.

  • A state machine.
    Yes, I've read the article series on "Structured Concurrency" linked above, and kinda agree with the author that all state machines can equivalently be expressed with communicating concurrent tasks instead. However, not for all problems this is actually simpler or easier to understand, especially when your domain objects are specified as state machines already.
    My particular problem is about getting a message with a count, and then doing that many requests for resources. However, the number of in-flight requests is limited, similar to a CapacityLimiter but with dynamic token sizes so I had to roll my own solution there. Also, during that process one might get new count messages, and adjust the requests so that the number of responses received after the last count matches that number. (Notice I do not want to cancel in-flight requests, but I might want to cancel waiting for the next available token, but only if I need none any longer).
    I arrived at the following solution:

    results = []
    expected_length = 0
    sent_requests = 0
    
    async def do_request(token):
        with token:
            res = await self.send_request_and_receive_response()
        results.append(res)
        if len(results) == expected_length:
            nursery.cancel_scope.cancel() # stop handle_count_updates
    
    sending = None
    async def do_all_requests():
        nonlocal sent_requests, sending
        if sending:
            return
        sending = trio.CancelScope()
        with sending:
            while sent_requests < expected_length:
                token = await self.get_request_token()
                nursery.start_soon(do_request, token)
                sent_requests += 1
        sending = None
    
    async def handle_count_updates():
        nonlocal sent_requests, expected_length
        async for count in self.count_messages:
            sent_requests -= (count - expected_length) # 0 + still in flight
            expected_length = count
            results = []
            if sending and sent_requests >= expected_length:
                sending.cancel()
            else:
                nursery.start_soon(do_all_requests)
    
    async with trio.open_nursery() as nursery:
        nursery.start_soon(handle_count_updates)
    return results

    This basically has the state components (sent_requests, expected_length) spelled out, then for each possible event there's a separate receive loop in a separate task, needing to use nonlocal to manipulate the state. In contrast, with SELECT this seems so much simpler (and also shorter):

    results = []
    expected_length = 0
    sent_requests = 0
    
    async def do_request(token):
        with token:
            res = await self.send_request_and_receive_response()
        results.append(res)
        if len(results) == expected_length:
            nursery.cancel_scope.cancel() # stop receiving count_messages
    
    async with trio.open_nursery() as nursery:
        waiting_token = NEVER()
        while True:
            token, count = await SELECT(waiting_token, self.count_messages.receive())
            if token is not None:
                nursery.start_soon(do_request, token)
                sent_requests += 1
                waiting_token = self.get_request_token() if sent_requests < expected_length else NEVER()
            if count is not None:
                sent_requests -= (count - expected_length) # 0 + still in flight
                expected_length = count
                results = []
    return results

    (General feedback welcome as well)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants