Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] Async I/O with structural control flow (a.k.a. Enforced Awaits) #2

Open
zah opened this issue May 29, 2018 · 17 comments
Open

[RFC] Async I/O with structural control flow (a.k.a. Enforced Awaits) #2

zah opened this issue May 29, 2018 · 17 comments

Comments

@zah
Copy link
Contributor

zah commented May 29, 2018

EDIT: 2020-02-12 Expanded the introduction section with more details explaining the existing problems in non-structural async.

Abstract

This is a proposal to change the semantics of the async procs in a way that enforces a more structural control flow. The goal of the new APIs is to force you to await your async operations, while still allowing you to easily execute multiple operations in parallel. The proposal eliminates a large category of usage errors with the old APIs and enables some additional optimisations such as storing your Future[T] results on the stack and creating async procs consuming stack-based openarray[T] inputs and var parameters.

For a more comprehensive set of rationales for enforcing the structural control flow proposed here, please read the following article:

https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/

Acknowledgements: some of the key ideas in this proposal were first suggested by @yglukhov in private conversations.

Current problems:

P1) All non-ref input parameters to async procs must be copied

Consider the following async proc:

proc checkBrokenLinks(uris: seq[Uri]): Future[seq[bool]] {.async.} =
  ## Tests all supplied URLs in parallel and returns
  ## whether they are still reachable or not.
  ...

If this wasn't an async proc, Nim would pass the supplied input sequence as a read-only reference (please note that I'm using C++ terminology here). This relies on the fact that the lifetime of the sequence at the call-site will surely extend until the moment the function delivers its result.

Unfortunately, in the async world this is no longer the case. The caller of checkBrokenLinks is free to use it like this:

proc brokenCode: Future[seq[bool] =
  var uris = @[
    Uri.init("git://github.com/status-im/nim-chronos"),
    Uri.init("https://status.team")
  ]
  let brokenLinksFut = checkBrokenLinks(uris)
  ...  
  return brokenLinksFut

If the uris sequence was passed by reference, the async proc may be resumed after brokenCode returns which may result in accessing the now dangling reference. To avoid this problem, the Nim compiler makes sure to copy all input parameters of the async proc to corresponding fields in the "environment" struct associated with the async proc's closure iterator. This copying may be quite expensive for value types such as string and seqs and the users are advised to avoid using such type in async procs and to prefer using ref parameters where only a pointer must be copied.

P2) var and openarray parameters are not supported

As a corollary from the previous problem, it becomes impossible to use var and openarray parameters with async procs, because these require the input data to be passed by reference.

P3) The async/await syntax has easily-accessible degrees of freedom that may be dangerous for novice users

Consider the following simple async proc:

proc terminateConnection(s: AsyncSocket) {.async.} =
  var myDisconnectMsg: array[X, byte]
  prepareDisconnect(myDisconnectMsg)
  var res = s.send(addr MyDisconnectMsg[0], X) # oops, forgot to call await here 
  s.close()

It showcases two critical problems triggered by a simple omission of await in the code:

  1. The socket will be closed prematurely.
  2. The send operation will be working with bogus data.

This proposal argues that the default behavior should be completely safe and impossible to misuse, while the more advanced concerns such as enabling parallel execution could be handled with a more specialized syntax.

The Proposed Solution:

We create a new set of APIs that hide the explicit use of Future values in the user code and enforce awaiting of all async operations. If all operations are awaited, it becomes possible to store the inputs of the said operations in the "pseudo stack" associated with the async proc, which in turn enables the use of the reference types such as lent, var and openarray providing much better safety than the current pointer/len inputs.

So, here is the full set of new APIs:

1. Allow await to be given multiple arguments or a tuple

proc httpRequest(url: string): Future[HttpResult]
proc jsonRpcCall(url: string): Future[JsonNode]

proc foo {.async.} =
  var keyword = "Status"
  var (googlePage, jsonData) = await(httpRequest(&"http://google.com/?q={keyword}"),
                                     jsonRpcCall("localhost/myApi"))
  
  echo "HTTP RESPONSE ", googlePage.status, "\n", googlePage.body
  echo "JSON RESPONSE\n", jsonData{"result"}

This form of await just performs the I/O operations in parallel returning a tuple of the final results. It is similar to using var r1 = request(); var r2 = request(); await all(r1, r2) in the current design.

For convenience await (foo, bar) is considered the same as await(foo,bar).

2. Introduce a new select API (EDIT: this point is made partially obsolete by point 4)

select is a new API that is given a number of I/O operations that should be started in parallel. The key difference from await is that the handlers installed for each operation will be executed as soon as any of the results are ready. Control flow keywords such as return and break can be used to cancel some of the outstanding operations:

proc foo {.async.} =
  var keyword = "Status"
  var timedOut = false

  select:
    httpRequest(&"http://google.com/?q={keyword}") as response:
      # executes this as soon as it's ready
      echo "HTTP RESPONSE ", googlePage.status, "\n", googlePage.body
 
    jsonRpcCall("localhost/myApi") as jsonData:
      echo jsonData{"result"}
      return # returns from the current proc; skips the other handlers

    *timeout(100):
      # `timeout` is the same as `sleepAsync`
      timedOut = true
      break # continues after the select; skips the other handlers

  echo "async ops ", if timedOut: "timed out" else: "finished"

The execution after the select block continues when all of the handlers have been executed, although there must be a way to mark some of them as optional (here, I've used * for this).

The named results are considered in scope after the select statement. You can choose to only name a particular result without providing a handling block.

3. Introduce a new safeasync pragma (EDIT: this may well be the default mode)

The safeasync pragma is responsible for inserting the await keyword in automatic way. It also has the role of the current multisync pragma in the sense that it allows you to compile the same code for both sync and async usage:

proc foo: bool {.safeasync.} =
  var keyword = "Status"
  # Notice how I don't need to use await anymore
  var (googlePage, jsonData) = (httpRequest(&"http://google.com/?q={keyword}"),
                                jsonRpcCall("localhost/myApi"))

  return googlePage.status == 200 and not jsonData.hasKey("error")

How does this work? It inserts a call to a template called implicitAwait on each expression within the proc's body. implicitAwait is defined as an identity for all non-future types and as a regular await statement for all futures:

template implicitAwait(x: auto): auto = x
template implicitAwait(x: Future): auto = await x

Please note that the body of a safeasync will work in synchronous mode by executing each operation in turn. It's also possible to compile the code for implicit off-loading to a background thread pool in programs that don't feature an asynchronous event loop.

Appending 3.A

Please note that using the await statement may still be supported inside safeasync procs. One may use it to improve the code clarity. It's also possible to implement safeasync in an alternative way that requires the use of await and signals any omission as an error, but the arguments for this are not very strong - in all code there might be significant differences between operations that are algorithmically cheaper or heavier. It's usually the names of the operations that reveal where the I/O waits will happen.

4. Support async operations in parallel blocks

I'm extending the proposal to also enhance Nim's parallel construct with additional support for async operations. This proposal can replace the need for a separate select API, although it could still exist as a simple high-level helper. The new features are the following:

Within parallel blocks:

4.1) Allow spawn to be followed by a do block that will be executed with the result of the operation, once complete.

4.2) Allow spawn to be used with procs returning Future[T] results. spawn immediately starts the async operation and it adds the Future to a list of tasks to be awaited just before the exit of the parallel block. This enforces the structural handling of the async operations, but one can still work with the returned futures in the familiar fashion - passing them to helper procs, setting up callbacks and so on. It is guaranteed that the callbacks will be executed in the same thread that has entered the parallel block.

4.3) Add a new call called spawnOptional that launches non-critical parallel operations. If the parallel block is able to complete before all such operations have completed, they are simply cancelled.

4.4) Support break and return in parallel blocks by cancelling all outstanding operations.

With such an API, the select example above becomes:

proc foo {.async.} =
  var keyword = "Status"
  var timedOut = false
  
  parallel:
    spawn httpRequest(&"http://google.com/?q={keyword}") do (response):
      # executes this as soon as it's ready
      echo "HTTP RESPONSE ", googlePage.status, "\n", googlePage.body
 
    spawn jsonRpcCall("localhost/myApi") do (jsonData):
      echo jsonData{"result"}
      return # returns from the current proc; skips the other handlers

    spawnOptional timeout(100) do:
      # `timeout` is the same as `sleepAsync`
      timedOut = true
      break # continues after the select; skips the other handlers
  
  echo "async ops ", if timedOut: "timed out" else: "finished"

Please note that such a parallel block will be more powerful than the select construct, because it enables you to add multiple tasks to be awaited from a loop.

The use of parallel blocks and spawn comes at a cost. All parameters passed in the spawn expression must be copied inside the spawned task. Please note that this matches precisely the behavior of spawn when it comes to sending computational tasks to a thread pool as well.

4.5) Introduce an underlying object representing the "parallel block" and create an accessor for it (e.g. a thisParallelBlock magic valid only inside the block). This object will feature operations such as addThreadJob, addAsyncIO, addOptionalAsyncIO. It's the equivalent to the nursery object described in the article linked in the abstract. Its goal is to enable the creation of helper libraries that perform something with the parallel block context.

parallel:
  addJobs(thisParallelBlock)

4.6) Define the exception-handling semantics inside parallel blocks - if an exception is thrown by a spawned task inside a parallel block, this exception will be re-raised in the thread that has entered the block. All other spawned tasks are cancelled.

5. Support async tasks in spawn outside of parallel blocks.

This is an escape hatch that will replace the current usages of asyncCheck and traceAsyncErrors. Semantically, it spawns a new "async thread" of execution. Just like when spawning a regular thread, all parameters passed to the spawn expression must be copied or moved in the new thread. The spawned function must annotated with raises: [Defect]. If it terminates with a Defect, the whole process is also terminated.

6. Migration path for the current async

A semi backwards-compatible async pragma can be added to serve as a drop-in replacement for the existing async pragma. It will differ in only one way. All started async operations will be added to a list that is awaited at the end of their scope. This is not strictly backwards-compatible, but most of the existing async code should not be affected by the change in semantics.

@cheatfate
Copy link
Collaborator

cheatfate commented May 30, 2018

Both point 1 and point 2 can be replaced by primitive like this: https://docs.python.org/3/library/asyncio-task.html#asyncio.wait.

From my experience, there no need on

var (a, b, c) = await(afut, bfut, cfut)

because it used very rare in real world, most of the time you filling array of tasks and waiting for any of them or for all of them. Also you will not find many lines of code which uses and or or constructions.

@zah
Copy link
Contributor Author

zah commented May 30, 2018

The dynamic interface in Python allows for working with heterogeneous result types, while in Nim you have to resort to having named Future variables from which the results can be extracted (losing the "structured control-flow" aspect of the design).

The key difference in the new proposal is that the result variables have their proper type in all places and the explicit use of futures is hidden.

Otherwise, I've considered the case of requesting a sequence of homogenous operations. I think this should be a separate request throttling API. (EDIT: this use case is now covered by the parallel block enhancements).

I'd also say that the need for heterogeneous parallel I/O is quite common in web development for example where it is typical for a web request to lead to several sub-requests to different micro-services, databases and so on.

@cheatfate
Copy link
Collaborator

asyncdispatch design allows you to accept seq[FutureBase] or varargs[FutureBase] and its enough to check readiness of Future.
Also extracting results from Future is extracting exceptions too, with your proposal its unclear how this construction must work if one of Future will fail, while python's wait allow you to specify how to react on Exception.

@zah
Copy link
Contributor Author

zah commented May 30, 2018

I don't argue about the low-level details. Nothing about asyncdispatch should change. The goal is to remove the explicit use of futures from the high level interface, so certain optimizations are easier and the code is safer (please read the linked article).

I don't think there is an issue with exceptions. The semantics of await are clear - all operations must complete, else an exception will be raised. It's also possible to introduce additional syntax in the API to let you preserve some of the results as futures (e.g. await (wrapErrors request(), bar())). The sync/async duality makes it very clear how to think about exceptions in all cases - you can always ask yourself "what would the code do in sync mode"?

With select, again all mandatory operations must complete, otherwise an exception will be thrown. We can also allow except branches for each request giving you a chance to handle the exception manually. Additional syntax will also be required for implementing progress events.

@cheatfate
Copy link
Collaborator

Sorry but i dont see any optimizations or safe code. Everything i see is proposal to include some more primitives to existing primitives, and add even more to resolve all possible issues.
Also i don't want to think about sync mode, there no reason to think about it, if people wants to be synchronous why they need asynchronous overhead.

What is proposed await modification? Its obfuscated version of

await wait(abc, bcd, ALL_COMPLETED)

What is proposed select? Its obfuscated version of

await wait(abc, bcd, cde, ALL_COMPLETED)

Most of people do not understand asynchronous programming with current API, so why do you think adding more API will help it understand better?

@zah
Copy link
Contributor Author

zah commented May 30, 2018

@cheatfate, if it's possible to design the system in a way that avoids heap allocations and reliance on the GC, it would be silly not to take advantage of it.

The specific optimization being discussed here is placing the Future objects within the stack of the caller. At the moment this is still the heap-allocated environment of the closure iterator, but this can be attacked with further optimizations in the compiler (escape analysis for the closure objects, better support for closure/iterator monomorphism in the run-time code). In the end it may be possible to store all the required state within the single value associated with the registered I/O operation. Rust achieves this.

Also, many async procs don't actually need to be compiled as iterators:

proc foo: Future[int] {.async.} =
  code code code  
  ...
  let v = await bar()
  ...
  return v

is equivalent to the sync proc:

proc foo: int =
  code code code
  ...
  let v = waitFor bar()
  ...
  return v

EDIT: The claim above is not correct in general, because if a second waitFor loop is entered while the first one is still waiting, it won't be able to awake the original one immediately, thus creating a change in the execution semantics.

I'd rather have an API that lets me experiment more easily with such changes in the compilation strategy.

Do you have any alternative proposal that can ensure all of the above? For the safety, I've given specific examples of problematic code that will be eliminated under the new APIs.

And finally, here is the full equivalent example under your alternative:

var
  googlePageFut = httpRequest(&"http://google.com/?q={keyword}")
  jsonDataFut = jsonRpcCall("localhost/myApi")

await wait(googlePageFut, jsonDataFut)

let
  googlePage = googlePageFut.read 
  jsonData = jsonDataFut.read

return googlePage.status == 200 and not jsonData.hasKey("error")

Why shouldn't we strive to eliminate this boilerplate?

@cheatfate
Copy link
Collaborator

cheatfate commented May 30, 2018

@zah, i think you missing not something but alot, or maybe somebody misled you with this optimizations. We already talked with @yglukhov, that if variable passes through state, it become part of closure environment, not stack, so all optimizations will not give you so many benefits as you expect.

Also i think you need to read more about rust implementation tokio, because i dont want to style like in rust:

  read(x).after(read(y).after(read(z)))

@zah
Copy link
Contributor Author

zah commented May 30, 2018

@cheatfate, I know precisely how the compiler works, but you are not paying enough attention to what I'm saying:

At the moment this is still the heap-allocated environment of the closure iterator, but this can be attacked with further optimizations in the compiler (escape analysis for the closure objects, better support for closure/iterator monomorphism in the run-time code)

The style of the Rust API is irrelevant, it's the underlying run-time approach that's interesting.

@cheatfate
Copy link
Collaborator

@zah you know precisely how the compiler works, and i know precisely how async works.
The style of Rust API is consequence of using zero-cost abstractions, almost equal approach is made for javascript's Promise.
I dont like such approach, because it pretty hard to understand and use.

@dom96
Copy link

dom96 commented Jun 17, 2018

I haven't read all of the discussion so forgive me if I'm missing something, but I already have a few things I would like to note about the whole reasoning for this issue: "Current Problems".

Consider the following simple async proc:

proc terminateConnection(s: AsyncSocket) {.async.} =
 var myDisconnectMsg: array[X, byte]
 prepareDisconnect(myDisconnectMsg)
 var res = s.send(addr MyDisconnectMsg[], X) # oops, forgot to call await here 
 s.close()

It showcases two critical problems trigged by a simple omission of await in the code:

  • The socket will be closed prematurely.
  • The send operation will be working with bogus data.

I am against any mixing of pointers and async. If you're doing that then you are on your own and should bear the brunt of the consequences.

Separately I do recognise the problem and it similar to the problem of users not knowing better and using discard on a future (instead of asyncCheck). But I think the solution to this is straightforward: destructors. Once a future is out of scope a callback can be set on it to ensure an exception is raised and that it does not get lost. Of course, I'm waiting for destructors to be stable to implement that.

Once that works, the above code should fail with an exception.

I'll read the rest of this as soon as I can as I am open to alternative solutions. But from what skimming this has seemingly revealed, this is suggesting really big changes to fix this one relatively minor issue.

@dom96
Copy link

dom96 commented Jun 17, 2018

So I read the article as well and I must say that the "nursery" idea proposed can be easily implemented on top of async await. I don't see a reason to restrict async await when these nurseries can be implemented with no breaking changes (and very easily too from what I can tell).

@cheatfate
Copy link
Collaborator

@dom96, what result is expected in example you showed?

  1. Message was sent. Socket closed.
  2. Message was not sent. Error about socket closure raised.
  3. Message was not sent. Socket closed.

(3) will be happen on Unix, but on Windows there is a big chance of (1) and less chance for (3), everything depends on number of messages in sending queue. Also asyncdispatch2 already has procedures which hiding pointer/size usage. While it possible to avoid pointers for writing, its almost impossible to avoid pointers for reading.

And about destructors. We already trained and find ways to cheat/workaround GC. Destructors is almost equal thing, we will need to find ways to workaround/cheat destructors too. The only right way (in my opinion) is to introduce manual memory management with a little help from compiler (for example compiler can calculate max size of single allocated object). With this information you can implement/use more fastest memory managers (like memory pools, please not to be confused with memory regions, because memory pools using constant memory blocks). So i'm not waiting destructors, because this is one more crutch.

@dom96
Copy link

dom96 commented Jun 17, 2018

Why would (3) be a possibility? Does Windows just ignore a closed socket and report that it sent a message even though it didn't?

@cheatfate
Copy link
Collaborator

@dom96,
(1) can be happen when send() -> WSASend() completed immediately, so data was actually sent, but IOCP completion will not be happened, just because socket will be closed.
(3) can be happen when send() -> WSASend() not completed immediately, when socket get closed all IOCP information about it will be removed from system queue.

@zah zah changed the title [RFC] A more structured API for parallel I/O [RFC] APIs for parallel I/O with structural control flow Jun 20, 2018
@zah
Copy link
Contributor Author

zah commented Jun 29, 2018

In an offline discussion, I promised to explain how an async accept loop may work under the new scheme. It was pointed out that if the code is written in the style of the old APIs, enforced awaits will lead to accepting and processing only one connection at a time:

while true:
  var conn = await socket.accept()
  await conn.process() # while this is awaited, we are not accepting new connections

The solve this problem, you must use the new async support enabled in parallel blocks. In general, the parallel blocks provide semantics that very closely ressemble the style of programming used in the old async APIs with spawn being roughly similar to asyncCheck. When you stumble on a problem, you should always ask yourself if the escape hatch provided by parallel blocks in enough to solve it. Here is how the solution here looks like:

parallel:
  while true:
    var conn = await socket.accept()
    spawn conn.process() # this gets added to the list of outstanding async ops
                         # of the block and the processing continues immediately

The parallel block won't be exited until all outstanding operations are complete. In this sense, it can be argued that the new API provides an additional safety benefit over the ad-hoc processing used in the old async APIs. With the old APIs, a shutdown procedure won't do anything about the outstanding async tasks by default, while here we clearly decide whether we want to wait for all the operations to finish or whether we wan to cancel them by using break to exit the loop.

Please also note that all the optimization benefits of the new scheme are still realized, because any local variable declared before the parallel or within the parallel block is guaranteed to outlive all the processing of the async operations started in the block and this allows us to pass pointers to such variables to the said async operations (enabling efficient openarray and var parameters and allowing us to store the futures and their results on the stack).

Besides using spawn as in the example above, in parallel blocks you can also directly manipulate the returned future values to add completion callbacks and error handling that will be executed with the familiar semantics of the old APIs (see the the remarks in 4.5 regarding this).

@zah zah changed the title [RFC] APIs for parallel I/O with structural control flow [RFC] APIs for parallel I/O with structural control flow (a.k.a. Enforced Awaits) Feb 12, 2020
@zah zah changed the title [RFC] APIs for parallel I/O with structural control flow (a.k.a. Enforced Awaits) [RFC] Async I/O with structural control flow (a.k.a. Enforced Awaits) Feb 12, 2020
@iffy
Copy link
Contributor

iffy commented Jul 1, 2020

Ignorant, fly-by question: how much of this could be implemented as a package without change to stdlib? I'm interested in using a system like the Python Trio system, because I think (but don't know) that it would simplify some of my async code. Anything I could do to help?

@zah
Copy link
Contributor Author

zah commented Jul 1, 2020

A certain version of this can be implemented without any changes to the compiler. Depending on exactly how much integration with parallel blocks is desired (in other words whether you want to mix async I/O with computational tasks backed by a thread pool), some compiler support can improve the system (and it's more about enforcing safety than about expressiveness). If you want to invest time in this, I can help you with some guidance. I would suggest finding me on Discord (https://discord.gg/XRxWahP) or Gitter.

cheatfate added a commit that referenced this issue Jan 20, 2021
cheatfate added a commit that referenced this issue Feb 10, 2021
cheatfate added a commit that referenced this issue Feb 10, 2021
zah pushed a commit that referenced this issue Feb 18, 2021
arnetheduck added a commit that referenced this issue Nov 18, 2023
# This is the 1st commit message:

futures: sinkify and lentify

this avoids copies here and there throughout the pipeline - ie
`copyString` and friends can often be avoided when moving things into
and out of futures

annoyingly, one has to sprinkle the codebase liberally with `sink` and
`move` for the pipeline to work well - sink stuff _generally_ works
better in orc/arc

# This is the commit message #2:

sink test
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants